RACScheduler
ReactiveCocoa 框架中的调度器,ReactiveCocoa 中的信号可以在 RACScheduler
上执行任务、发送结果; RACScheduler
的实现主要是基于 GCD 封装,提供了 GCD 不具备的特性。
RACScheduler 有 4 个子类:
RACTestScheduler
RACSubscriptionScheduler
RACImmediateScheduler
RACQueueScheduler
接下来会分析平时使用中常接触到的 RACImmediateScheduler、RACQueueScheduler 和 RACSubscriptionScheduler 3个子类
RACScheduler 基类提供了5个构造方法:
1 2 3 4 5 6 7 8 9 10 11 + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(nullable NSString *)name; + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority; + (RACScheduler *)scheduler; + (nullable RACScheduler *)currentScheduler; + (RACScheduler *)immediateScheduler; + (RACScheduler *)mainThreadScheduler;
+schedulerWithPriority:name:
、+schedulerWithPriority:
、+scheduler
三个方法细线逻辑相差不大
1 2 3 4 5 6 7 8 9 10 11 + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name { return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0 )]; } + (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority { return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler" ]; } + (RACScheduler *)scheduler { return [self schedulerWithPriority:RACSchedulerPriorityDefault]; }
+schedulerWithPriority:name:
: 指定线程的优先级和名称,返回的 RACTargetQueueScheduler 对象,RACTargetQueueScheduler 是 RACQueueScheduler 的子类
+schedulerWithPriority:
: 指定线程的优先级,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler
+scheduler
: 线程的优先级设置为 RACSchedulerPriorityDefault,线程名称设置为 org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler
+currentScheduler: 1 2 3 4 5 6 7 8 9 10 11 + (RACScheduler *)currentScheduler { RACScheduler *scheduler = NSThread .currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey]; if (scheduler != nil ) return scheduler; if ([self .class isOnMainThread]) return RACScheduler.mainThreadScheduler; return nil ; } + (BOOL )isOnMainThread { return [NSOperationQueue .currentQueue isEqual:NSOperationQueue .mainQueue] || [NSThread isMainThread]; }
该方法是从当前线程的线程字典获取对应的 RACScheduler 对象,如果为 nil 则判断当前线程是否为主线程,如果是在主线程上,就返回 mainThreadScheduler。如果既不在主线程上,则返回 nil。
1 2 3 4 5 6 7 8 9 + (RACScheduler *)immediateScheduler { static dispatch_once_t onceToken; static RACScheduler *immediateScheduler; dispatch_once (&onceToken, ^{ immediateScheduler = [[RACImmediateScheduler alloc] init]; }); return immediateScheduler; }
这是一个单例方法,返回了 RACImmediateScheduler 对象
+mainThreadScheduler 1 2 3 4 5 6 7 8 9 + (RACScheduler *)mainThreadScheduler { static dispatch_once_t onceToken; static RACScheduler *mainThreadScheduler; dispatch_once (&onceToken, ^{ mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()]; }); return mainThreadScheduler; }
同样是一个单例方法,返回的是 RACTargetQueueScheduler 对象,并将其名称赋值为 org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler
+subscriptionScheduler 1 2 3 4 5 6 7 8 9 + (RACScheduler *)subscriptionScheduler { static dispatch_once_t onceToken; static RACScheduler *subscriptionScheduler; dispatch_once (&onceToken, ^{ subscriptionScheduler = [[RACSubscriptionScheduler alloc] init]; }); return subscriptionScheduler; }
也是一个单例方法,返回了 RACSubscriptionScheduler 对象。
RACScheduler 还提供了5个实例方法:
1 2 3 4 5 6 7 8 9 - (nullable RACDisposable *)schedule:(void (^)(void ))block; - (nullable RACDisposable *)after:(NSDate *)date schedule:(void (^)(void ))block; - (nullable RACDisposable *)afterDelay:(NSTimeInterval )delay schedule:(void (^)(void ))block; - (nullable RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval )interval withLeeway:(NSTimeInterval )leeway schedule:(void (^)(void ))block; - (nullable RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
schedule:
: 传入 block 并根据相关条件来触发 block
after:schedule:
: 延迟执行 block,传入的延迟时间是 NSDate 类型
afterDelay:schedule:
: 延迟执行 block,传入的延迟时间是 NSTimeInterval 类型
after:repeatingEvery:withLeeway:schedule:
: 创建定时任务,循环间隔由参数 interval 决定
scheduleRecursiveBlock:
: 递归触发 block
上面5个方法除了 -scheduleRecursiveBlock:addingToDisposable:
都是由子类进行具体实现
-scheduleRecursiveBlock:addingToDisposable: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 - (void )scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable { @autoreleasepool { RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable]; [disposable addDisposable:selfDisposable]; __weak RACDisposable *weakSelfDisposable = selfDisposable; RACDisposable *schedulingDisposable = [self schedule:^{ @autoreleasepool { [disposable removeDisposable:weakSelfDisposable]; } if (disposable.disposed) return ; void (^reallyReschedule)(void ) = ^{ if (disposable.disposed) return ; [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]; }; __block NSLock *lock = [[NSLock alloc] init]; lock.name = [NSString stringWithFormat:@"%@ %s" , self , sel_getName(_cmd)]; __block NSUInteger rescheduleCount = 0 ; __block BOOL rescheduleImmediately = NO ; @autoreleasepool { recursiveBlock(^{ [lock lock]; BOOL immediate = rescheduleImmediately; if (!immediate) ++rescheduleCount; [lock unlock]; if (immediate) reallyReschedule(); }); } [lock lock]; NSUInteger synchronousCount = rescheduleCount; rescheduleImmediately = YES ; [lock unlock]; for (NSUInteger i = 0 ; i < synchronousCount; i++) { reallyReschedule(); } }]; [selfDisposable addDisposable:schedulingDisposable]; } }
上面这段函数实现中有几个关键的变量/参数:
reallyReschedule:递归执行函数的闭包
recursiveBlock:函数传参,该闭包的参数也是一个闭包(block),recursiveBlock 执行完之后会触发传入的闭包 block
rescheduleCount:递归的次数
rescheduleImmediately:是否立即执行递归闭包 reallyReschedule
主要流程:
初始化相关变量之后,执行 recursiveBlock ,第一次 rescheduleImmediately 为 NO,rescheduleCount 递增
recursiveBlock 执行完后 rescheduleImmediately 赋值为 YES
递归执行 reallyReschedule 闭包 rescheduleCount 次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (void )performAsCurrentScheduler:(void (^)(void ))block { NSCParameterAssert (block != NULL ); RACScheduler *previousScheduler = RACScheduler.currentScheduler; NSThread .currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self ; @autoreleasepool { block(); } if (previousScheduler != nil ) { NSThread .currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler; } else { [NSThread .currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey]; } }
执行 block 之前将当前线程的 scheduler 保存下来为 previousScheduler,然后设置自己为新的 scheduler
执行完 block 之后恢复现场,如果 previousScheduler 不为空,则重新复制为当前线程的 scheduler,否则从 threadDictionary 移除 RACSchedulerCurrentSchedulerKey 对应的对象,也就是当前 scheduler
RACImmediateScheduler 是一个私有类,主要特点是将加入的 block 立即(Immediate)进行调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 - (RACDisposable *)schedule:(void (^)(void ))block { NSCParameterAssert (block != NULL ); block(); return nil ; } - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void ))block { NSCParameterAssert (date != nil ); NSCParameterAssert (block != NULL ); [NSThread sleepUntilDate:date]; block(); return nil ; } - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval )interval withLeeway:(NSTimeInterval )leeway schedule:(void (^)(void ))block { NSCAssert (NO , @"+[RACScheduler immediateScheduler] does not support %@." , NSStringFromSelector (_cmd)); return nil ; } - (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock { for (__block NSUInteger remaining = 1 ; remaining > 0 ; remaining--) { recursiveBlock(^{ remaining++; }); } return nil ; }
-schedule
方法会立即触发传入的 block;
-after:schedule:
方法会将当前线程休眠到指定时间后执行 block;
-after:repeatingEvery:withLeeway:schedule:
不支持
-scheduleRecursiveBlock:
循环不断执行传入的 block
RACQueueScheduler 1 2 3 4 5 6 7 8 9 10 11 12 - (RACDisposable *)schedule:(void (^)(void ))block { NSCParameterAssert (block != NULL ); RACDisposable *disposable = [[RACDisposable alloc] init]; dispatch_async (self .queue, ^{ if (disposable.disposed) return ; [self performAsCurrentScheduler:block]; }); return disposable; }
-schedule
在 self.queue 队列中异步调用了 -performAsCurrentScheduler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void ))block { NSCParameterAssert (date != nil ); NSCParameterAssert (block != NULL ); RACDisposable *disposable = [[RACDisposable alloc] init]; dispatch_after([self .class wallTimeWithDate:date], self .queue, ^{ if (disposable.disposed) return ; [self performAsCurrentScheduler:block]; }); return disposable; } + (dispatch_time_t)wallTimeWithDate:(NSDate *)date { NSCParameterAssert (date != nil ); double seconds = 0 ; double frac = modf(date.timeIntervalSince1970, &seconds); struct timespec walltime = { .tv_sec = (time_t)fmin(fmax(seconds, LONG_MIN), LONG_MAX), .tv_nsec = (long )fmin(fmax(frac * NSEC_PER_SEC , LONG_MIN), LONG_MAX) }; return dispatch_walltime(&walltime, 0 ); }
调用 wallTimeWithDate 方法将 NSDate 转化成 dispatch_time_t
调用 dispatch_after 将 block 放进队列 self.queue 中延迟执行
block 执行过程中是通过 -performAsCurrentScheduler:
方法触发,触发前判断 disposable 是否被取消,取消则直接返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval )interval withLeeway:(NSTimeInterval )leeway schedule:(void (^)(void ))block { NSCParameterAssert (date != nil ); NSCParameterAssert (interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC ); NSCParameterAssert (leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC ); NSCParameterAssert (block != NULL ); uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC ); uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC ); dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0 , 0 , self .queue); dispatch_source_set_timer(timer, [self .class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs); dispatch_source_set_event_handler(timer, block); dispatch_resume(timer); return [RACDisposable disposableWithBlock:^{ dispatch_source_cancel(timer); }]; }
该方法是通过 GCD 创建定时任务,然后通过 dispatch_source_set_event_handler 把参数 block 和 计时器关联起来,任务被取消的时候取消对应的定时器
RACTargetQueueScheduler RACTargetQueueScheduler 继承于 RACQueueScheduler,提供了一新的初始化方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (instancetype )initWithName:(NSString *)name targetQueue:(dispatch_queue_t )targetQueue { NSCParameterAssert (targetQueue != NULL ); if (name == nil ) { name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)" , dispatch_queue_get_label(targetQueue)]; } dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL); if (queue == NULL ) return nil ; dispatch_set_target_queue(queue, targetQueue); return [super initWithName:name queue:queue]; }
在初始化方法创建了串行队列 queue,然后通过 dispatch_set_target_queue 把 targetQueue 和 queue 关联起来
dispatch_set_target_queue 有2个作用:
初始化方法内 queue 是通过 dispatch_queue_create 创建,无法设置优先级,dispatch_set_target_queue 可以将 queue 优先级设置为 targetQueue 的优先级
设置队列的层次体系,可以理解为 queue 中的任务会派发给 targetQueue;比如如果 targetQueue 是 DISPATCH_QUEUE_SERIAL 串行队列,则 queue 中的任务也是串行执行;如果多个 queue 都指定同一个 targetQueue 串行队列,那么多个 queue 的任务是同步执行的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 - (void )testSetTarget { dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue" , DISPATCH_QUEUE_SERIAL); dispatch_queue_t queue1 = dispatch_queue_create("queue1" , DISPATCH_QUEUE_SERIAL); dispatch_queue_t queue2 = dispatch_queue_create("queue2" , DISPATCH_QUEUE_CONCURRENT); dispatch_set_target_queue(queue1, targetQueue); dispatch_set_target_queue(queue2, targetQueue); dispatch_async (queue1, ^{ NSLog (@"1. queue1 excute" ); }); dispatch_async (queue1, ^{ [NSThread sleepForTimeInterval:1. f]; NSLog (@"2. queue1 excute" ); }); dispatch_async (queue2, ^{ NSLog (@"1. queue2 excute" ); }); dispatch_async (queue2, ^{ NSLog (@"2. queue2 excute" ); }); dispatch_async (targetQueue, ^{ NSLog (@"target queue" ); }); }
输出:
1 2 3 4 5 2019-03-31 19:47:32.441322+0800 AppTest[42843:1630586] 1. queue1 excute 2019-03-31 19:47:33.446694+0800 AppTest[42843:1630586] 2. queue1 excute 2019-03-31 19:47:33.446917+0800 AppTest[42843:1630586] 1. queue2 excute 2019-03-31 19:47:33.447003+0800 AppTest[42843:1630586] 2. queue2 excute 2019-03-31 19:47:33.447095+0800 AppTest[42843:1630586] target queue
RACSubscriptionScheduler 相对于父类,主要添加了一个私有的属性:
1 @property (nonatomic , strong , readonly ) RACScheduler *backgroundScheduler;
在其初始化的时候会创建 backgroundScheduler
1 2 3 4 5 6 7 - (instancetype )init { self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler" ]; _backgroundScheduler = [RACScheduler scheduler]; return self ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 - (RACDisposable *)schedule:(void (^)(void ))block { NSCParameterAssert (block != NULL ); if (RACScheduler.currentScheduler == nil ) return [self .backgroundScheduler schedule:block]; block(); return nil ; } - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void ))block { RACScheduler *scheduler = RACScheduler.currentScheduler ?: self .backgroundScheduler; return [scheduler after:date schedule:block]; } - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval )interval withLeeway:(NSTimeInterval )leeway schedule:(void (^)(void ))block { RACScheduler *scheduler = RACScheduler.currentScheduler ?: self .backgroundScheduler; return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block]; }
其他的方法大体逻辑都是判断当前线程有没有对应的 RACScheduler,如果有任务则在当前线程对应的 RACScheduler 执行,若没有则在 backgroundScheduler 上执行