0%

ReactiveCocoa-RACScheduler底层实现分析

RACScheduler ReactiveCocoa 框架中的调度器,ReactiveCocoa 中的信号可以在 RACScheduler 上执行任务、发送结果; RACScheduler 的实现主要是基于 GCD 封装,提供了 GCD 不具备的特性。

RACScheduler 有 4 个子类:

  • RACTestScheduler

  • RACSubscriptionScheduler

  • RACImmediateScheduler

  • RACQueueScheduler

接下来会分析平时使用中常接触到的 RACImmediateScheduler、RACQueueScheduler 和 RACSubscriptionScheduler 3个子类

RACScheduler

基类提供了5个构造方法:

1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(nullable NSString *)name;

+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;

+ (RACScheduler *)scheduler;

+ (nullable RACScheduler *)currentScheduler;

+ (RACScheduler *)immediateScheduler;

+ (RACScheduler *)mainThreadScheduler;

+schedulerWithPriority:name:+schedulerWithPriority:+scheduler 三个方法细线逻辑相差不大

1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}

+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}

+ (RACScheduler *)scheduler {
return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}
  1. +schedulerWithPriority:name:: 指定线程的优先级和名称,返回的 RACTargetQueueScheduler 对象,RACTargetQueueScheduler 是 RACQueueScheduler 的子类
  2. +schedulerWithPriority:: 指定线程的优先级,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler
  3. +scheduler: 线程的优先级设置为 RACSchedulerPriorityDefault,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler

+currentScheduler:

1
2
3
4
5
6
7
8
9
10
11
+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

return nil;
}

+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

该方法是从当前线程的线程字典获取对应的 RACScheduler 对象,如果为 nil 则判断当前线程是否为主线程,如果是在主线程上,就返回 mainThreadScheduler。如果既不在主线程上,则返回 nil。

+immediateScheduler:

1
2
3
4
5
6
7
8
9
+ (RACScheduler *)immediateScheduler {
static dispatch_once_t onceToken;
static RACScheduler *immediateScheduler;
dispatch_once(&onceToken, ^{
immediateScheduler = [[RACImmediateScheduler alloc] init];
});

return immediateScheduler;
}

这是一个单例方法,返回了 RACImmediateScheduler 对象

+mainThreadScheduler

1
2
3
4
5
6
7
8
9
+ (RACScheduler *)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken, ^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});

return mainThreadScheduler;
}

同样是一个单例方法,返回的是 RACTargetQueueScheduler 对象,并将其名称赋值为 org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler

+subscriptionScheduler

1
2
3
4
5
6
7
8
9
+ (RACScheduler *)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});

return subscriptionScheduler;
}

也是一个单例方法,返回了 RACSubscriptionScheduler 对象。

RACScheduler 还提供了5个实例方法:

1
2
3
4
5
6
7
8
9
- (nullable RACDisposable *)schedule:(void (^)(void))block;

- (nullable RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block;

- (nullable RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block;

- (nullable RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;

- (nullable RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
  1. schedule:: 传入 block 并根据相关条件来触发 block

  2. after:schedule:: 延迟执行 block,传入的延迟时间是 NSDate 类型

  3. afterDelay:schedule:: 延迟执行 block,传入的延迟时间是 NSTimeInterval 类型

  4. after:repeatingEvery:withLeeway:schedule:: 创建定时任务,循环间隔由参数 interval 决定

  5. scheduleRecursiveBlock:: 递归触发 block

上面5个方法除了 -scheduleRecursiveBlock:addingToDisposable: 都是由子类进行具体实现

-scheduleRecursiveBlock:addingToDisposable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];

__weak RACDisposable *weakSelfDisposable = selfDisposable;

RACDisposable *schedulingDisposable = [self schedule:^{
@autoreleasepool {
// 已经触发之后,weakSelfDisposable 已经没有作用,故移除
[disposable removeDisposable:weakSelfDisposable];
}

if (disposable.disposed) return;

void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};

// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];

__block NSUInteger rescheduleCount = 0;

// 同步操作执行完后会赋值为YES,然后执行reallyReschedule闭包
__block BOOL rescheduleImmediately = NO;

@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];

if (immediate) reallyReschedule();
});
}

[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];

for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];

[selfDisposable addDisposable:schedulingDisposable];
}
}

上面这段函数实现中有几个关键的变量/参数:

  • reallyReschedule:递归执行函数的闭包

  • recursiveBlock:函数传参,该闭包的参数也是一个闭包(block),recursiveBlock 执行完之后会触发传入的闭包 block

  • rescheduleCount:递归的次数

  • rescheduleImmediately:是否立即执行递归闭包 reallyReschedule

主要流程:

  1. 初始化相关变量之后,执行 recursiveBlock ,第一次 rescheduleImmediately 为 NO,rescheduleCount 递增
  2. recursiveBlock 执行完后 rescheduleImmediately 赋值为 YES
  3. 递归执行 reallyReschedule 闭包 rescheduleCount 次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);

// If we're using a concurrent queue, we could end up in here concurrently,
// in which case we *don't* want to clear the current scheduler immediately
// after our block is done executing, but only *after* all our concurrent
// invocations are done.

RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;

@autoreleasepool {
block();
}

if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
  1. 执行 block 之前将当前线程的 scheduler 保存下来为 previousScheduler,然后设置自己为新的 scheduler
  2. 执行完 block 之后恢复现场,如果 previousScheduler 不为空,则重新复制为当前线程的 scheduler,否则从 threadDictionary 移除 RACSchedulerCurrentSchedulerKey 对应的对象,也就是当前 scheduler

RACImmediateScheduler

RACImmediateScheduler 是一个私有类,主要特点是将加入的 block 立即(Immediate)进行调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

block();
return nil;
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);

[NSThread sleepUntilDate:date];
block();

return nil;
}

- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCAssert(NO, @"+[RACScheduler immediateScheduler] does not support %@.", NSStringFromSelector(_cmd));
return nil;
}

- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
for (__block NSUInteger remaining = 1; remaining > 0; remaining--) {
recursiveBlock(^{
remaining++;
});
}

return nil;
}
  • -schedule 方法会立即触发传入的 block;
  • -after:schedule: 方法会将当前线程休眠到指定时间后执行 block;
  • -after:repeatingEvery:withLeeway:schedule: 不支持
  • -scheduleRecursiveBlock: 循环不断执行传入的 block

RACQueueScheduler

1
2
3
4
5
6
7
8
9
10
11
12
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

-schedule 在 self.queue 队列中异步调用了 -performAsCurrentScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

+ (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
NSCParameterAssert(date != nil);

double seconds = 0;
double frac = modf(date.timeIntervalSince1970, &seconds);

struct timespec walltime = {
.tv_sec = (time_t)fmin(fmax(seconds, LONG_MIN), LONG_MAX),
.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC, LONG_MIN), LONG_MAX)
};

return dispatch_walltime(&walltime, 0);
}
  1. 调用 wallTimeWithDate 方法将 NSDate 转化成 dispatch_time_t
  2. 调用 dispatch_after 将 block 放进队列 self.queue 中延迟执行
  3. block 执行过程中是通过 -performAsCurrentScheduler: 方法触发,触发前判断 disposable 是否被取消,取消则直接返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(block != NULL);

uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);

dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue);
dispatch_source_set_timer(timer, [self.class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs);
dispatch_source_set_event_handler(timer, block);
dispatch_resume(timer);

return [RACDisposable disposableWithBlock:^{
dispatch_source_cancel(timer);
}];
}

该方法是通过 GCD 创建定时任务,然后通过 dispatch_source_set_event_handler 把参数 block 和 计时器关联起来,任务被取消的时候取消对应的定时器

RACTargetQueueScheduler

RACTargetQueueScheduler 继承于 RACQueueScheduler,提供了一新的初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);

if (name == nil) {
name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}

dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;

dispatch_set_target_queue(queue, targetQueue);

return [super initWithName:name queue:queue];
}

在初始化方法创建了串行队列 queue,然后通过 dispatch_set_target_queue 把 targetQueue 和 queue 关联起来

dispatch_set_target_queue 有2个作用:

  1. 初始化方法内 queue 是通过 dispatch_queue_create 创建,无法设置优先级,dispatch_set_target_queue 可以将 queue 优先级设置为 targetQueue 的优先级

  2. 设置队列的层次体系,可以理解为 queue 中的任务会派发给 targetQueue;比如如果 targetQueue 是 DISPATCH_QUEUE_SERIAL 串行队列,则 queue 中的任务也是串行执行;如果多个 queue 都指定同一个 targetQueue 串行队列,那么多个 queue 的任务是同步执行的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    - (void)testSetTarget {
    dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue", DISPATCH_QUEUE_SERIAL);
    dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_SERIAL);
    dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);

    dispatch_set_target_queue(queue1, targetQueue);
    dispatch_set_target_queue(queue2, targetQueue);

    dispatch_async(queue1, ^{
    NSLog(@"1. queue1 excute");
    });
    dispatch_async(queue1, ^{
    [NSThread sleepForTimeInterval:1.f];
    NSLog(@"2. queue1 excute");
    });
    dispatch_async(queue2, ^{
    NSLog(@"1. queue2 excute");
    });
    dispatch_async(queue2, ^{
    NSLog(@"2. queue2 excute");
    });
    dispatch_async(targetQueue, ^{
    NSLog(@"target queue");
    });
    }

    输出:

    1
    2
    3
    4
    5
    2019-03-31 19:47:32.441322+0800 AppTest[42843:1630586] 1. queue1 excute
    2019-03-31 19:47:33.446694+0800 AppTest[42843:1630586] 2. queue1 excute
    2019-03-31 19:47:33.446917+0800 AppTest[42843:1630586] 1. queue2 excute
    2019-03-31 19:47:33.447003+0800 AppTest[42843:1630586] 2. queue2 excute
    2019-03-31 19:47:33.447095+0800 AppTest[42843:1630586] target queue

RACSubscriptionScheduler

相对于父类,主要添加了一个私有的属性:

1
@property (nonatomic, strong, readonly) RACScheduler *backgroundScheduler;

在其初始化的时候会创建 backgroundScheduler

1
2
3
4
5
6
7
- (instancetype)init {
self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler"];

_backgroundScheduler = [RACScheduler scheduler];

return self;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];

block();
return nil;
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date schedule:block];
}

- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
}

其他的方法大体逻辑都是判断当前线程有没有对应的 RACScheduler,如果有任务则在当前线程对应的 RACScheduler 执行,若没有则在 backgroundScheduler 上执行