RACStream 是 ReactiveCocoa 中的核心概念:信号;RACStream 中有2个子类:
RACSignal
RACSequence
RACSignal 实际项目中,对 RACSignal 的使用中,经常会看到这样的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber){ [subscriber sendNext:@(1 )]; [subscriber sendNext:@(2 )]; [subscriber sendNext:@(3 )]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal dispose" ); }]; }]; RACDisposable *disposable = [signal subscribeNext:^(id x) { NSLog (@"received value = %@" , x); } error:^(NSError *error) { NSLog (@"received error: %@" , error); } completed:^{ NSLog (@"received completed" ); }]; [disposable dispose];
基于以上的代码,我们可以看看 RACSigal从 被订阅到订阅者受到数据的整个过程,具体经历过那些流程
初始化:
1 2 3 4 5 + (RACSignal *)createSignal:(RACDisposable * (^)(id <RACSubscriber> subscriber))didSubscribe { return [RACDynamicSignal createSignal:didSubscribe]; }
上面初始化返回 RACDynamicSignal
对象,这是一个私有类,继承RACSignal,实现 RACSignal 的订阅行为
1 2 3 4 5 6 7 8 9 @interface RACDynamicSignal : RACSignal + (RACSignal *)createSignal:(RACDisposable * (^)(id <RACSubscriber> subscriber))didSubscribe; @end
在调用 RACDynamicSignal 的 createSignal
方法中,会传入一个名为 didSubscribe
,返回类型为 RACDisposable
的block,在内部会将 didSubscribe 进行一次copy,然后保存在 RACDynamicSignal 对象中。
1 2 3 4 5 6 7 + (RACSignal *)createSignal:(RACDisposable * (^)(id <RACSubscriber> subscriber))didSubscribe { RACDynamicSignal *signal = [[self alloc] init]; signal->_didSubscribe = [didSubscribe copy ]; return [signal setNameWithFormat:@"+createSignal:" ]; }
上面代码初始化返回 signal 调用方法 setNameWithFormat
,会执行基类 RACStream 的方法,给 name 属性赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (instancetype )setNameWithFormat:(NSString *)format, ... { if (getenv("RAC_DEBUG_SIGNAL_NAMES" ) == NULL ) return self ; NSCParameterAssert (format != nil ); va_list args; va_start(args, format); NSString *str = [[NSString alloc] initWithFormat:format arguments:args]; va_end(args); self .name = str; return self ; }
从 didSubscribe 中还可以看到 RACSubscriber 协议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @protocol RACSubscriber <NSObject >@required - (void )sendNext:(nullable id )value; - (void )sendError:(nullable NSError *)error; - (void )sendCompleted; - (void )didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
代码4中可以看到,创建 RACSignal 的时候,signal 会持有保存传入的 didSubscribe 闭包。这个闭包会在 signal 被订阅的时候触发
1 2 3 4 5 6 7 8 9 10 - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void ))completedBlock { NSCParameterAssert (nextBlock != NULL ); NSCParameterAssert (errorBlock != NULL ); NSCParameterAssert (completedBlock != NULL ); RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock]; return [self subscribe:o]; }
我们平时对 RACSignal 实例对象调用 - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock
或者 - (RACDisposable *)subscribeError:(void (^)(NSError *error))errorBlock
方法会返回 RACDisposable 对象,内部会创建一个实现了 RACSubscriber 协议的对象 o, 把 next、error、completed 三种block进行 copy 保存
1 2 3 4 5 6 7 8 9 10 11 + (instancetype )subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void ))completed { RACSubscriber *subscriber = [[self alloc] init]; subscriber->_next = [next copy ]; subscriber->_error = [error copy ]; subscriber->_completed = [completed copy ]; return subscriber; }
然后调用 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
方法并把刚刚 RACSubscriber 对象传入,如代码2所示,这里实际上是调用了 RACDynamicSignal 的 subscriber 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (RACDisposable *)subscribe:(id <RACSubscriber>)subscriber { NSCParameterAssert (subscriber != nil ); RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable]; if (self .didSubscribe != NULL ) { RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{ RACDisposable *innerDisposable = self .didSubscribe(subscriber); [disposable addDisposable:innerDisposable]; }]; [disposable addDisposable:schedulingDisposable]; } return disposable; }
在代码9中我们可以看到 2 个 类:RACCompoundDisposable、 RACPassthroughSubscriber
RACCompoundDisposable 继承 RACDisposable,可以理解为是一个保存 RACDisposable 类型的容器,当 RACCompoundDisposable 执行 - (void)dispose
方法会将 disposables 数组中的元素一一进行 dispose
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 static void disposeEach(const void *value, void *context) { RACDisposable *disposable = (__bridge id )value; [disposable dispose]; } - (void )dispose { #if RACCompoundDisposableInlineCount RACDisposable *inlineCopy[RACCompoundDisposableInlineCount]; #endif CFArrayRef remainingDisposables = NULL ; pthread_mutex_lock(&_mutex); { _disposed = YES ; #if RACCompoundDisposableInlineCount for (unsigned i = 0 ; i < RACCompoundDisposableInlineCount; i++) { inlineCopy[i] = _inlineDisposables[i]; _inlineDisposables[i] = nil ; } #endif remainingDisposables = _disposables; _disposables = NULL ; } pthread_mutex_unlock(&_mutex); #if RACCompoundDisposableInlineCount for (unsigned i = 0 ; i < RACCompoundDisposableInlineCount; i++) { [inlineCopy[i] dispose]; } #endif if (remainingDisposables == NULL ) return ; CFIndex count = CFArrayGetCount (remainingDisposables); CFArrayApplyFunction (remainingDisposables, CFRangeMake (0 , count), &disposeEach, NULL ); CFRelease (remainingDisposables); }
RACPassthroughSubscriber 是一个私有类,主要的作用是把一个订阅者 subscriber A 的信号事件传递给另一个没有被 dispose 的订阅者 subscriber B。由以下几个步骤实现:
包装真正的订阅者,使自己成为订阅者的替代者
将真正的订阅者与一个订阅时产生的 Disposable 关联起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @interface RACPassthroughSubscriber ()@property (nonatomic , strong , readonly ) id <RACSubscriber> innerSubscriber;@property (nonatomic , unsafe_unretained , readonly ) RACSignal *signal;@property (nonatomic , strong , readonly ) RACCompoundDisposable *disposable;@end
回到代码9 中,之前已经创建刚刚提到的 RACCompoundDisposable 和 RACPassthroughSubscriber,
1 2 3 4 5 6 7 8 9 10 if (self .didSubscribe != NULL ) { RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{ RACDisposable *innerDisposable = self .didSubscribe(subscriber); [disposable addDisposable:innerDisposable]; }]; [disposable addDisposable:schedulingDisposable]; }
subscriptionScheduler
是一个单例方法,返回 RACSubscriptionScheduler 对象
1 2 3 4 5 6 7 8 9 10 11 + (instancetype )subscriptionScheduler { static dispatch_once_t onceToken; static RACScheduler *subscriptionScheduler; dispatch_once (&onceToken, ^{ subscriptionScheduler = [[RACSubscriptionScheduler alloc] init]; }); return subscriptionScheduler; }
继而调用 RACSubscriptionScheduler 的 schedule
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - (RACDisposable *)schedule:(void (^)(void ))block { NSCParameterAssert (block != NULL ); if (RACScheduler.currentScheduler == nil ) return [self .backgroundScheduler schedule:block]; block(); return nil ; } + (BOOL )isOnMainThread { return [NSOperationQueue .currentQueue isEqual:NSOperationQueue .mainQueue] || [NSThread isMainThread]; } + (RACScheduler *)currentScheduler { RACScheduler *scheduler = NSThread .currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey]; if (scheduler != nil ) return scheduler; if ([self .class isOnMainThread]) return RACScheduler.mainThreadScheduler; return nil ; }
schedule
方法中主要先去找当前线程对应的 RACScheduler 对象,如果找不到,则去找主线程对应的 RACScheduler 对象,如果还是找不到,则返回 backgroundScheduler 对象。
schedule
方法目的是主要释放参数 block,也就是执行代码12中
1 2 3 4 RACDisposable *innerDisposable = self .didSubscribe(subscriber); [disposable addDisposable:innerDisposable];
代码 4 中保存的闭包 didSubscribe
会在这里执行,进而可能调用闭包入参 subscriber(也就是之前创建的RACPassthroughSubscriber对象) 的 sendNext,sendError,sendCompleted 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 - (void )sendNext:(id )value { if (self .disposable.disposed) return ; if (RACSIGNAL_NEXT_ENABLED()) { RACSIGNAL_NEXT(cleanedSignalDescription(self .signal), cleanedDTraceString(self .innerSubscriber.description), cleanedDTraceString([value description])); } [self .innerSubscriber sendNext:value]; } - (void )sendError:(NSError *)error { if (self .disposable.disposed) return ; if (RACSIGNAL_ERROR_ENABLED()) { RACSIGNAL_ERROR(cleanedSignalDescription(self .signal), cleanedDTraceString(self .innerSubscriber.description), cleanedDTraceString(error.description)); } [self .innerSubscriber sendError:error]; } - (void )sendCompleted { if (self .disposable.disposed) return ; if (RACSIGNAL_COMPLETED_ENABLED()) { RACSIGNAL_COMPLETED(cleanedSignalDescription(self .signal), cleanedDTraceString(self .innerSubscriber.description)); } [self .innerSubscriber sendCompleted]; }
RACPassthroughSubscriber 在以上的方法会将信号事件转发给 innerSubscriber,因为 innerSubscriber 是 RACSubscriber 对象,进而会执行私有类 RACSubscriber 中的方法 sendNext/sendError/sendCompleted
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 - (void )sendNext:(id )value { @synchronized (self ) { void (^nextBlock)(id ) = [self .next copy ]; if (nextBlock == nil ) return ; nextBlock(value); } } - (void )sendError:(NSError *)e { @synchronized (self ) { void (^errorBlock)(NSError *) = [self .error copy ]; [self .disposable dispose]; if (errorBlock == nil ) return ; errorBlock(e); } } - (void )sendCompleted { @synchronized (self ) { void (^completedBlock)(void ) = [self .completed copy ]; [self .disposable dispose]; if (completedBlock == nil ) return ; completedBlock(); } }
执行上面方法前,都会先加锁保证线程安全,然后将对应的block(代码 8 创建 RACSubscriber 保存的 _next
/_error
/_completed
)进行一次 copy,最后执行 block,最终的结果就是执行 代码 1 中的 STEP 2
、STEP 3
、STEP 4
。
基于以上的分析,RACSignal 从创建到给订阅发送事件可以归纳为以下几个步骤:
调用RACSignal createSignal
的方法,返回子类 RACDynamicSignal 对象并保存闭包 didSubscribe – 代码 1
订阅信号,调用RACSignal subscribeNext
方法,在该方法中会创建 RACSubscriber 订阅者对象
创建的 RACSubscriber 订阅者对象会copy nextBlock,errorBlock,completedBlock,保存到相关属性中 – 代码 7
调用步骤1创建的 RACDynamicSignal subscribe
方法
创建 RACCompoundDisposable 和 RACPassthroughSubscriber 对象,RACPassthroughSubscriber 会保存 步骤1返回的 RACDynamicSignal(signal属性),步骤2中的 RACSubscriber(innerSubscriber属性)– 代码 11
执行 RACDynamicSignal 保存的 didSubscribe 闭包,闭包内调用 RACPassthroughSubscriber sendNext
,sendError
,sendCompleted
方法
RACPassthroughSubscriber 执行保存的 innerSubscriber 对应的 sendNext
,sendError
,sendCompleted
方法 — 代码 17