bind、concat、zipWith 都属于 RACSignal 基本操作方法,前一篇文章 已经介绍过 bind,接下来再来分析 concat 和 zipWith
concat 首先来看 concat 方法,同样先测试 concat 执行效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 RACSignal *signal1 = [RACSignal createSignal: ^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal1 dispose" ); }]; }]; RACSignal *signal2 = [RACSignal createSignal: ^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@4 ]; [subscriber sendNext:@5 ]; [subscriber sendNext:@6 ]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal2 dispose" ); }]; }]; RACSignal *concatSignal = [signal1 concat:signal2]; [concatSignal subscribeNext:^(id x) { NSLog (@"subscribe value = %@" , x); }];
执行代码,可以从看到控制输出大概如下:
1 2 3 4 5 6 7 8 subscribe value = 1 subscribe value = 2 subscribe value = 3 subscribe value = 4 subscribe value = 5 subscribe value = 6 signal2 dispose signal1 dispose
从输出接口,可以大致猜测 concat 是将2个信号进行合并,然后依次发送给最终的的订阅者
concat 的实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - (RACSignal *)concat:(RACSignal *)signal { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init]; RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ RACDisposable *concattedDisposable = [signal subscribe:subscriber]; [compoundDisposable addDisposable:concattedDisposable]; }]; [compoundDisposable addDisposable:sourceDisposable]; return compoundDisposable; }] setNameWithFormat:@"[%@] -concat: %@" , self .name, signal]; }
分析源码,concat 方法主要分为以下几个步骤:
先订阅自己,也就是 代码 1
中的 signal1,
signal1 被订阅之后就执行 signal1 的 didSubscribe,给最终的订阅者 subscriber 发送 sendNext、sendError
signal1 发送完sendCompleted 信号之后,开始订阅 signal2,signal2 发送信号的逻辑与步骤1-2一样
从上面的分析,注意到:
signal2 需要在 signal1 发送 sendCompleted 之后才开始被订阅,所以如果在 代码 1
中 signal1 如果不发送 sendCompleted,控制台就不会输出 4,5,6
compoundDisposable 是先把第二个信号订阅返回的 concattedDisposable 加入数组中,然后再把 signal1 订阅返回的 sourceDisposable 加入数组,所以在 代码 1
中 signal2先结束,然后 signal1 再结束。当2个信号都结束之后,concat之后的新信号也就结束
concat 流程可以用下图概括:
zipWith 同样先测试 zipWith
方法的效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 RACSignal *signal1 = [RACSignal createSignal: ^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal1 dispose" ); }]; }]; RACSignal *signal2 = [RACSignal createSignal: ^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@4 ]; [subscriber sendNext:@5 ]; [subscriber sendNext:@6 ]; [subscriber sendNext:@7 ]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal2 dispose" ); }]; }]; RACSignal *zipSignal = [signal1 zipWith:signal2]; [zipSignal subscribeNext:^(id x) { NSLog (@"subscribe value = %@" , x); }];
执行并输出
1 2 3 4 5 6 7 8 9 10 11 12 subscribe value = <RACTwoTuple: 0x60800001b5f0> ( 1, 4 ) subscribe value = <RACTwoTuple: 0x60800001b680> ( 2, 5 ) subscribe value = <RACTwoTuple: 0x60800001b660> ( 3, 6 )
看起来 zipWith
是把2个信号发送值合并成 tuple 类型并发送给订阅者
具体源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 - (RACSignal *)zipWith:(RACSignal *)signal { NSCParameterAssert (signal != nil ); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { __block BOOL selfCompleted = NO ; NSMutableArray *selfValues = [NSMutableArray array]; __block BOOL otherCompleted = NO ; NSMutableArray *otherValues = [NSMutableArray array]; void (^sendCompletedIfNecessary)(void ) = ^{ @synchronized (selfValues) { BOOL selfEmpty = (selfCompleted && selfValues.count == 0 ); BOOL otherEmpty = (otherCompleted && otherValues.count == 0 ); if (selfEmpty || otherEmpty) [subscriber sendCompleted]; } }; void (^sendNext)(void ) = ^{ @synchronized (selfValues) { if (selfValues.count == 0 ) return ; if (otherValues.count == 0 ) return ; RACTuple *tuple = RACTuplePack(selfValues[0 ], otherValues[0 ]); [selfValues removeObjectAtIndex:0 ]; [otherValues removeObjectAtIndex:0 ]; [subscriber sendNext:tuple]; sendCompletedIfNecessary(); } }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (selfValues) { [selfValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { selfCompleted = YES ; sendCompletedIfNecessary(); } }]; RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { @synchronized (selfValues) { [otherValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { otherCompleted = YES ; sendCompletedIfNecessary(); } }]; return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [otherDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -zipWith: %@" , self .name, signal]; }
zipWith 操作主要经历以下几步:
初始化2个可变数组,分别保存 signal1 和 signal2 发送的信号事件
订阅 signal1,然后会执行 BLOCK 4
把值保存到数组中,再去执行之前初始化好的 sendNext
闭包,然后判断2个数组是否都有值,如都有则把2个数组的头元素组出来,包装成 RACTuple 类型发给订阅者,当然 BLOCK 4
中 signal1 被订阅的时候都没有发出去,因为 otherValues 为空
订阅 signal2,具体逻辑跟步骤2相似,signal2被订阅之后会执行 BLOCK 5
,然后发送的值存在对应数组,然后执行 sendNext
闭包,这时候2个数组不为空,给订阅者发送 RACTuple,然后判断 signal1和 signal2 其中是否有一个已经完成并且数组为空,则给订阅者发送 sendCompleted
第一个信号依次发送的1,2,3的值和第二个信号依次发送的4,5,6,7的值,一一合并,当signal2 发送 7 的时候 signal1 已经结束了,所以 7 也就是被丢弃了。
RACDisposable 每次对一个 RACSignal 进行订阅执行 subscribeNext
方法的时候,都会返回 一个 RACDisposable 对象,之前也提到过 RACDisposable 主要对一次订阅结束过后做相关的清理操作,而且它可以随时取消任何一个订阅,这方面类似 NSURLSessionDataTask
。
RACDisposable 有一个核心方法 -dispose
,功能类似于我们经常接触到的 -dealloc
方法,接下来分析 RACDisposable 实现代码,看其是如何工作的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 - (instancetype )init { self = [super init]; _disposeBlock = (__bridge void *)self ; OSMemoryBarrier(); return self ; } - (instancetype )initWithBlock:(void (^)(void ))block { NSCParameterAssert (block != nil ); self = [super init]; _disposeBlock = (void *)CFBridgingRetain ([block copy ]); OSMemoryBarrier(); return self ; } + (instancetype )disposableWithBlock:(void (^)(void ))block { return [[self alloc] initWithBlock:block]; }
这里看到有2个初始化方法 init
和 initWithBlock
,后置是把参数 block 拷贝并以 void *
的形式保存到 _disposeBlock
中;而 init
中 _disposeBlock
指向的是 self,这里指向 self 的作用是什么呢?
1 2 3 - (BOOL )isDisposed { return _disposeBlock == NULL ; }
我们可以看到 isDisposed
的返回结果是通过判断 _disposeBlock
是否为空,这里 init
中 _disposeBlock
指向的是 self主要为了在不引入其他实例变量、增加对象的基础上避免 isDisposed
方法的误判
基于上面的结论,很可能会产生另外一个疑惑,为什么不把 init
方法禁用,只能通 initWithBlock
方法进行初始化?这是因为对于某些 RACDisposable 的子类如 RACSerialDisposable,执行多余的 _disposeBlock
比较浪费性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (void )dispose { void (^disposeBlock)(void ) = NULL ; while (YES ) { void *blockPtr = _disposeBlock; if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL , &_disposeBlock)) { if (blockPtr != (__bridge void *)self ) { disposeBlock = CFBridgingRelease (blockPtr); } break ; } } if (disposeBlock != nil ) disposeBlock(); }
-dispose
主要是将 _disposeBlock 进行一次 release,然后执行这个block
RACSerialDisposable RACSerialDisposable 是一个容量为1的 RACDisposable 容器,RACDisposable 对象保存在 _disposable
属性中
1 2 3 - (void )setDisposable:(RACDisposable *)disposable { [self swapInDisposable:disposable]; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (RACDisposable *)swapInDisposable:(RACDisposable *)newDisposable { RACDisposable *existingDisposable; BOOL alreadyDisposed; pthread_mutex_lock(&_mutex); alreadyDisposed = _disposed; if (!alreadyDisposed) { existingDisposable = _disposable; _disposable = newDisposable; } pthread_mutex_unlock(&_mutex); if (alreadyDisposed) { [newDisposable dispose]; return nil ; } return existingDisposable; }
设置 _disposable
的时候采用 pthred_mutex_t
互斥锁来保证操作的原子性,先判断是否已经被 dispose,若没有则将 newDisposable 赋值给 _disposable,反之直接对新的 newDisposable 执行 dispose
1 2 3 4 5 6 7 8 9 10 11 12 13 - (void )dispose { RACDisposable *existingDisposable; pthread_mutex_lock(&_mutex); if (!_disposed) { existingDisposable = _disposable; _disposed = YES ; _disposable = nil ; } pthread_mutex_unlock(&_mutex); [existingDisposable dispose]; }
RACSerialDisposable 的 dispose 方法内部是在线程安全的基础上将 _disposable 换出,然后再进行 dispose
RACCompoundDisposable RACCompoundDisposable 也是保存 RACDisposable 的容器,和 RACSerialDisposable 不同的是,RACCompoundDisposable可以保存多个 RACDisposable 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @interface RACCompoundDisposable () { pthread_mutex_t _mutex; #if RACCompoundDisposableInlineCount RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount]; #endif CFMutableArrayRef _disposables; BOOL _disposed; }
RACCompoundDisposable 内部用2个数组来保存 RACDisposable,分别是 _inlineDisposables
和 _disposables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - (instancetype )initWithDisposables:(NSArray *)otherDisposables { self = [self init]; #if RACCompoundDisposableInlineCount [otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) { self ->_inlineDisposables[index] = disposable; if (index == RACCompoundDisposableInlineCount - 1 ) *stop = YES ; }]; #endif if (otherDisposables.count > RACCompoundDisposableInlineCount) { _disposables = RACCreateDisposablesArray(); CFRange range = CFRangeMake (RACCompoundDisposableInlineCount, (CFIndex )otherDisposables.count - RACCompoundDisposableInlineCount); CFArrayAppendArray (_disposables, (__bridge CFArrayRef )otherDisposables, range); } return self ; }
当数量少于等于 RACCompoundDisposableInlineCount(默认为2) 的时候会保存在 _inlineDisposables 数组里面,超过这个临界值后,后面的 RACDisposable 保存在 _disposables 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 - (void )addDisposable:(RACDisposable *)disposable { NSCParameterAssert (disposable != self ); if (disposable == nil || disposable.disposed) return ; BOOL shouldDispose = NO ; pthread_mutex_lock(&_mutex); { if (_disposed) { shouldDispose = YES ; } else { #if RACCompoundDisposableInlineCount for (unsigned i = 0 ; i < RACCompoundDisposableInlineCount; i++) { if (_inlineDisposables[i] == nil ) { _inlineDisposables[i] = disposable; goto foundSlot; } } #endif if (_disposables == NULL ) _disposables = RACCreateDisposablesArray(); CFArrayAppendValue (_disposables, (__bridge void *)disposable); if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) { RACCOMPOUNDDISPOSABLE_ADDED(self .description.UTF8String, disposable.description.UTF8String, CFArrayGetCount (_disposables) + RACCompoundDisposableInlineCount); } #if RACCompoundDisposableInlineCount foundSlot:; #endif } } pthread_mutex_unlock(&_mutex); if (shouldDispose) [disposable dispose]; }
添加 RACDisposable 的时候同样先判断当前是否已经被 dispose,若是则直接执行参数 RACDisposable 对象的 dispose;反之则先判断 _inlineDisposables 是否有剩余空间,如果没有就加入到 _disposables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 - (void )dispose { #if RACCompoundDisposableInlineCount RACDisposable *inlineCopy[RACCompoundDisposableInlineCount]; #endif CFArrayRef remainingDisposables = NULL ; pthread_mutex_lock(&_mutex); { _disposed = YES ; #if RACCompoundDisposableInlineCount for (unsigned i = 0 ; i < RACCompoundDisposableInlineCount; i++) { inlineCopy[i] = _inlineDisposables[i]; _inlineDisposables[i] = nil ; } #endif remainingDisposables = _disposables; _disposables = NULL ; } pthread_mutex_unlock(&_mutex); #if RACCompoundDisposableInlineCount for (unsigned i = 0 ; i < RACCompoundDisposableInlineCount; i++) { [inlineCopy[i] dispose]; } #endif if (remainingDisposables == NULL ) return ; CFIndex count = CFArrayGetCount (remainingDisposables); CFArrayApplyFunction (remainingDisposables, CFRangeMake (0 , count), &disposeEach, NULL ); CFRelease (remainingDisposables); }
在 -dispose
中,_inlineDisposables 和 _disposables 遍历,然后逐一进行 dispose