ReactiveCocoa 框架 RACSignal+Operations.h 定义了 RACSignal 常规操作方法,接下来对一些常用的方法进行分析并解析其作用。
doNext 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (void )testDoNext { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"source signal" ]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal doNext:^(id _Nullable x) { NSLog (@"sourceSignal doNext will execute before sendNext" ); }] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 sourceSignal doNext will execute before sendNext value = source signal
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (RACSignal *)doNext:(void (^)(id x))block { NSCParameterAssert (block != NULL ); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { return [self subscribeNext:^(id x) { block(x); [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; }] setNameWithFormat:@"[%@] -doNext:" , self .name]; }
doNext 传入block 闭包,该闭包的参数就是原信号发送的值,当给订阅者发送 sendNext 之前会执行 block 闭包
类似的 doError,doCompleted 也是在给订阅者发送事件之前就执行相关 block
throttle:valuesPassingTest: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (void )testThrottleValuesPassingTest { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; }); return nil ; }]; RACSignal *throttleSignal = [sourceSignal throttle:1 valuesPassingTest:^BOOL (id _Nullable next) { return [next integerValue] <= 2 ; }]; [throttleSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
throttle:valuesPassingTest: 方法有2个参数:
时间间隔 interval
判断条件闭包 predicate
该方法大概作用是:从原信号发出第一个信号(@0)发出开始计时,在 interval
秒内如果第二个信号(@1) 符合 predicate 的判断条件,则该前一个信号会被忽略;如果2个信号间隔时间超过 interval
秒或者不满足 predicate 判断,则前一个信号会发给订阅者。
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 - (RACSignal *)throttle:(NSTimeInterval )interval valuesPassingTest:(BOOL (^)(id next))predicate { NSCParameterAssert (interval >= 0 ); NSCParameterAssert (predicate != nil ); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable]; RACScheduler *scheduler = [RACScheduler scheduler]; __block id nextValue = nil ; __block BOOL hasNextValue = NO ; RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init]; void (^flushNext)(BOOL send) = ^(BOOL send) { @synchronized (compoundDisposable) { [nextDisposable.disposable dispose]; if (!hasNextValue) return ; if (send) [subscriber sendNext:nextValue]; nextValue = nil ; hasNextValue = NO ; } }; RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) { RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler; BOOL shouldThrottle = predicate(x); @synchronized (compoundDisposable) { flushNext(NO ); if (!shouldThrottle) { [subscriber sendNext:x]; return ; } nextValue = x; hasNextValue = YES ; nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{ flushNext(YES ); }]; } } error:^(NSError *error) { [compoundDisposable dispose]; [subscriber sendError:error]; } completed:^{ flushNext(YES ); [subscriber sendCompleted]; }]; [compoundDisposable addDisposable:subscriptionDisposable]; return compoundDisposable; }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:" , self .name, (double )interval]; }
throttle:valuesPassingTest: 内部主要通过判断 nextValue
和 hasNextValue
2个变量的状态来判断是否给订阅者发送信号。内部先订阅原信号,然后触发订阅者的 didSubscribe,结合测试代码,具体流程:
收到原信号发出的信号 @0
,传入到条件闭包 predicate,返回 YES
RACCompoundDisposable 作为线程间互斥信号量,用 @synchronized 加锁保证 nextValue
和 hasNextValue
操作是原子性
执行 flushNext(NO) ,把 nextDisposable 进行 dispose,delayScheduler 之前存放的延迟任务如果未被执行会被取消;hasNextValue == NO,直接 return,没有给订阅者发送0
判断 !shouldThrottle,跳过 if 内部代码,给 nextValue 和 hasNextValue 赋值,nextValue=@0,hasNextValue = YES
把 flushNext(YES) 加入到延迟队列中,1秒后执行
原信号发送 @1,此时时间间隔不到 1秒,从步骤3开始重复上述步骤;flushNext(NO) 中 延迟任务被取消, nextValue
和 hasNextValue
会赋值为对应 零值。shouldThrottle 符合,nextValue
和 hasNextValue
又被赋值到 @1 和 YES,保存新的 flushNext(YES) 到延迟任务队列中
1秒后还没有收到原信号发送的信号,执行步骤6保存的 flushNext(YES),把 @1 发给订阅者;2秒后收到原信号的信号 @2,从步骤3开始重复上述步骤,@2会被保存到 hasNextValue
中,等待下一次延迟任务可以触发的时候发给订阅者,反之被忽略
步骤7完之后马上收到新的信号@3,从步骤3开始重复上述步骤,步骤7保存的 flushNext(YES) 被取消,因为此时不符合 predicate 判断条件,@3发给订阅者
delay: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (void )testDelay { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal delay:1 ] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; NSLog (@"%@" , [NSDate date]); }
输出:
1 2 3 4 5 2019 -01 -31 09 :34 :33.286371 +0800 AppTest[38331 :4774309 ] Thu Jan 31 09 :34 :33 2019 2019 -01 -31 09 :34 :34.380736 +0800 AppTest[38331 :4774309 ] value = 0 2019 -01 -31 09 :34 :34.380881 +0800 AppTest[38331 :4774309 ] value = 1 2019 -01 -31 09 :34 :34.380986 +0800 AppTest[38331 :4774309 ] value = 2 2019 -01 -31 09 :34 :34.381076 +0800 AppTest[38331 :4774309 ] value = 3
delay: 方法是将原信号的信号事件延迟发送给订阅者
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 - (RACSignal *)delay:(NSTimeInterval )interval { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; RACScheduler *scheduler = [RACScheduler scheduler]; void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) { RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler; RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block]; [disposable addDisposable:schedulerDisposable]; }; RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) { schedule(^{ [subscriber sendNext:x]; }); } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ schedule(^{ [subscriber sendCompleted]; }); }]; [disposable addDisposable:subscriptionDisposable]; return disposable; }] setNameWithFormat:@"[%@] -delay: %f" , self .name, (double )interval]; }
delay: 内部首先先定义 schedule ,其参数是 dispatch_block_t,执行 schedule 的时候,会把参数 block 放到 RACScheduler 延迟 interval 秒触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 - (RACDisposable *)afterDelay:(NSTimeInterval )delay schedule:(void (^)(void ))block { return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block]; } - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void ))block { NSCParameterAssert (date != nil ); NSCParameterAssert (block != NULL ); RACDisposable *disposable = [[RACDisposable alloc] init]; dispatch_after([self .class wallTimeWithDate:date], self .queue, ^{ if (disposable.disposed) return ; [self performAsCurrentScheduler:block]; }); return disposable; }
根据测试代码,最终会调用 RACQueueScheduler 中的 -after:schedule:
,在该方法里面,通过 dispatch_after 来触发入参 block,触发先前会先判断该任务是否被 disposed ,如果是则直接 return。
归纳来说,delay: 方法就是将原信号的 sendNext 和 sendCompleted 事件延迟 interval
秒发送给订阅者。
bufferWithTime:onScheduler: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - (void )testBufferWithTime { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]]; [bufferSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 4 5 6 value = <RACTuple: 0x600000003200> ( 0, 1, 2, 3 )
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 - (RACSignal *)bufferWithTime:(NSTimeInterval )interval onScheduler:(RACScheduler *)scheduler { NSCParameterAssert (scheduler != nil ); NSCParameterAssert (scheduler != RACScheduler.immediateScheduler); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init]; NSMutableArray *values = [NSMutableArray array]; void (^flushValues)() = ^{ @synchronized (values) { [timerDisposable.disposable dispose]; if (values.count == 0 ) return ; RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values]; [values removeAllObjects]; [subscriber sendNext:tuple]; } }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (values) { if (values.count == 0 ) { timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues]; } [values addObject:x ?: RACTupleNil.tupleNil]; } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ flushValues(); [subscriber sendCompleted]; }]; return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [timerDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@" , self .name, (double )interval, scheduler]; }
同样 bufferWithTime:onScheduler: 内部还会创建新的信号,当新的信号被订阅的时候会创建可变数组 values 。然后定义闭包 flushValues,当收到原信号的 sendNext 和 sendCompleted 的时候会触发该闭包。
之后对原信号进行订阅,首先收到原信号的 sendNext 事件的时候,先判断 values 的元素个数,如果个数为 0,则把 flushValues 延迟 interval
秒触发。然后把信号值加入 values 数组中,如果信号值为空,则把 RACTupleNil.tupleNil 加入数组中。
因为测试代码中,@0、@1、@2、@3 四个信号是连续串行发出,所以之前被加入延迟队列中执行的 flushValues 还没有触发的时候,当原信号发送 sendCompleted 的时候,flushValues 会被触发,这时候 values 四个元素被一次性包装成 RACTuple 发送给订阅者。
如果把测试代码修改成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (void )testBufferWithTime2 { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendCompleted]; }); }); return nil ; }]; RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]]; [bufferSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 4 5 6 7 8 value = <RACTuple: 0x604000204300 > ( 0 , 1 ) value = <RACTuple: 0x600000019300 > ( 2 , 3 )
订阅者此时会受到2个信号,都是 RACTuple 类型,回到之前的实现代码,
收到 @0,flushValues() 加入延迟队列1秒之后执行,@0 再加入到数组 values 中
步骤1之后马上收到 @1(1秒内),重复步骤1,这时候延迟队列中有2个任务
1秒后,第一个加入延迟队列的 flushValues() 执行,首先执行了 [timerDisposable.disposable dispose];
,这样会将延迟队列中下一个 flushValues() 任务被取消(步骤2加入的)。然后 values 元素包装成 RACTuple 发送给订阅者。
基于步骤3,再过一秒,也就是从一开始算,2秒过后。收到@2,然后重复步骤,逻辑相似
最后再过2秒,原信号发送 sendCompleted,执行 flushValues() ,values 元素个数为0,直接返回。
总结:bufferWithTime:onScheduler: 作用是将规定时间内将原信号所发送的全部信号包装成 RACTupe 发送给订阅者
timeout:onScheduler: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (void )testTimeout { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendNext:@3 ]; [subscriber sendCompleted]; }); return nil ; }]; RACSignal *timeoutSig = [sourceSignal timeout:1 onScheduler:[RACScheduler currentScheduler]]; [timeoutSig subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; [timeoutSig subscribeError:^(NSError * _Nullable error) { NSLog (@"error = %@" , error); }]; }
输出:
1 2 3 4 value = 0 value = 1 value = 2 error = Error Domain=RACSignalErrorDomain Code=1 "(null)"
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 - (RACSignal *)timeout:(NSTimeInterval )interval onScheduler:(RACScheduler *)scheduler { NSCParameterAssert (scheduler != nil ); NSCParameterAssert (scheduler != RACScheduler.immediateScheduler); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{ [disposable dispose]; [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil ]]; }]; [disposable addDisposable:timeoutDisposable]; RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [disposable dispose]; [subscriber sendError:error]; } completed:^{ [disposable dispose]; [subscriber sendCompleted]; }]; [disposable addDisposable:subscriptionDisposable]; return disposable; }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@" , self .name, (double )interval, scheduler]; }
timeout:onScheduler: 判断从被订阅开始计时, interval
时间内如果原信号没有把所有信号发送完毕会给订阅者发送 error。
首先创建新的信号,新信号被订阅的时候在延迟队列里面加入任务, interval
后该任务会将 NSError 发送给订阅者。如果在 interval
内原型号发送了 sendCompleted/sendError,会执行 [disposable dispose];
,延迟队列中的任务会被取消。同样如果延迟任务被触发,原信号的订阅也被终止。
map: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (void )testMap { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal map:^id _Nullable(id _Nullable value) { return @([value integerValue] + 1 ); }] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 value = 2 value = 3 value = 4
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 - (__kindof RACStream *)map:(id (^)(id value))block { NSCParameterAssert (block != nil ); Class class = self .class; return [[self flattenMap:^(id value) { return [class return :block(value)]; }] setNameWithFormat:@"[%@] -map:" , self .name]; }
map 的参数一个参数类型为id,返回值类型也为id的block,内部实现首先通过断言判断 block 是否为空,然后调用 flattenMap 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block { Class class = self .class; return [[self bind:^{ return ^(id value, BOOL *stop) { id stream = block(value) ?: [class empty]; NSCAssert ([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@" , stream); return stream; }; }] setNameWithFormat:@"[%@] -flattenMap:" , self .name]; }
flattenMap 是通过封装 bind 方法实现,bind的入参是 RACStreamBindBlock 类型闭包,flattenMap 的入参是一个入参类型为 id,返回值为 RACStream 的闭包
在 flattenMap 中,先判断 block(value) 返回的信号是否为nil,若是则返对应class 的empty 信号,也就是 RACEmptySignal 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 + (RACSignal *)empty { #ifdef DEBUG return [[[self alloc] init] setNameWithFormat:@"+empty" ]; #else static id singleton; static dispatch_once_t pred; dispatch_once (&pred, ^{ singleton = [[self alloc] init]; }); return singleton; #endif }
从实现代码上来看,RACEmptySignal 是以单例对象的形式返回
RACEmptySignal.m 文件里还重写了 -subscribe
:
1 2 3 4 5 6 7 - (RACDisposable *)subscribe:(id <RACSubscriber>)subscriber { NSCParameterAssert (subscriber != nil ); return [RACScheduler.subscriptionScheduler schedule:^{ [subscriber sendCompleted]; }]; }
RACEmptySignal 信号被订阅之后会马上给订阅者发送 sendCompleted 事件
回到 flattenMap 的实现逻辑中,如果 block(value) 返回不是nil,它是如何返回 RACStream 对象的呢?
执行 block(value) 实际上就是触发 代码2 中的 BLOCK 2
在 BLOCK 2
中,又会触发另个 block(value),这时候会触发代码1中的 BLOCK 1
,这里 block(value) 会返回相关对象(测试代码中返回NSNumber),BLOCK 2
中将 NSNumber 包装成 RACReturnSignal 返回,此时在 BLOCK 3
stream 对象就是 RACReturnSignal。
最后通过 bind 函数的变换,订阅会收到变换过后的值。
mapReplace 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (void )testMapReplace { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal mapReplace:@(10 )] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 value = 10 value = 10 value = 10
底层实现:
1 2 3 4 5 - (__kindof RACStream *)mapReplace:(id )object { return [[self map:^(id _) { return object; }] setNameWithFormat:@"[%@] -mapReplace: %@" , self .name, RACDescription(object)]; }
mapReplace 是通过封装 map 方法实现,把原信号每一个事件都变换成参数 object 传递给订阅者
reduceEach 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (void )testReduceEach { RACSignal *signal1 = [RACSignal createSignal: ^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:RACTuplePack(@1 ,@2 )]; [subscriber sendNext:RACTuplePack(@3 ,@4 )]; [subscriber sendNext:RACTuplePack(@5 ,@6 )]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"signal1 dispose" ); }]; }]; RACSignal *signal2 = [signal1 reduceEach:^id (NSNumber *num1 , NSNumber *num2){ return @([num1 intValue] + [num2 intValue]); }]; [signal2 subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 4 value = 3 value = 7 value = 11 signal1 dispose
从输出结果看出来,reduceEach 将原信号 signal1 发送的 RACTuple 数据进行解包聚合发送给订阅者
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 - (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock { NSCParameterAssert (reduceBlock != nil ); __weak RACStream *stream __attribute__((unused)) = self ; return [[self map:^(RACTuple *t) { NSCAssert ([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@" , stream, t); return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t]; }] setNameWithFormat:@"[%@] -reduceEach:" , self .name]; }
reduceEach 也是通过封装 map 方法实现。reduceEach 方法的入参是 RACReduceBlock 类型的闭包
1 typedef id _Nonnull (^RACReduceBlock)();
首先通过断言判断 reduceBlock 闭包是否为nil,然后调用 map 方法,map 方法的入参也就是 BLOCK 1
,在 BLOCK 1
里会先判断原信号 signal 发送的数据 t 是否为 RACTuple 类型,然后返回 RACBlockTrampoline 类型对象
1 2 3 @interface RACBlockTrampoline ()@property (nonatomic , readonly , copy ) id block;@end
RACBlockTrampoline 内部会保存一个 block 对象,然后根据传进来的参数,动态的构造一个 NSInvocation,通过执行 NSInvocation 返回需要的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 - (id )invokeWithArguments:(RACTuple *)arguments { SEL selector = [self selectorForArgumentCount:arguments.count]; NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]]; invocation.selector = selector; invocation.target = self ; for (NSUInteger i = 0 ; i < arguments.count; i++) { id arg = arguments[i]; NSInteger argIndex = (NSInteger )(i + 2 ); [invocation setArgument:&arg atIndex:argIndex]; } [invocation invoke]; __unsafe_unretained id returnVal; [invocation getReturnValue:&returnVal]; return returnVal; } - (SEL)selectorForArgumentCount:(NSUInteger )count { NSCParameterAssert (count > 0 ); switch (count) { case 0 : return NULL ; case 1 : return @selector (performWith:); case 2 : return @selector (performWith::); case 3 : return @selector (performWith:::); case 4 : return @selector (performWith::::); case 5 : return @selector (performWith:::::); case 6 : return @selector (performWith::::::); case 7 : return @selector (performWith:::::::); case 8 : return @selector (performWith::::::::); case 9 : return @selector (performWith:::::::::); case 10 : return @selector (performWith::::::::::); case 11 : return @selector (performWith:::::::::::); case 12 : return @selector (performWith::::::::::::); case 13 : return @selector (performWith:::::::::::::); case 14 : return @selector (performWith::::::::::::::); case 15 : return @selector (performWith:::::::::::::::); } NSCAssert (NO , @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported." ); return NULL ; }
1 2 3 4 5 6 - (id )performWith:(id )obj1 :(id )obj2 { id (^block)(id , id ) = self .block; return block(obj1, obj2); } ...
-invokeWithArguments
中首先通过判断 RACTuple 元素数量选择对应的 selector,最大能支持 15 个元素。
确定好 NSInvocation 的 target 和 seletor,还需要设置参数
1 2 3 4 5 for (NSUInteger i = 0 ; i < arguments.count; i++) { id arg = arguments[i]; NSInteger argIndex = (NSInteger )(i + 2 ); [invocation setArgument:&arg atIndex:argIndex]; }
这里看到设置 Type Encodings 的时候是从偏移量为 2开始,这是因为偏移量0和 偏移量1的参数分别对应着隐藏参数 self 和 _cmd。
构造好 invocation 之后,执行 [invocation invoke]
调用动态方法,实际上就是执行代码4中的入参 reduceBlock 闭包,最后通过 [invocation getReturnValue:&returnVal]
拿到闭包的返回值,也就是 RACBlockTrampoline 的最终返回值,最终成为 map 闭包里面的返回值。剩下的就是 map 函数流程。
or 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:RACTuplePack(@NO,@YES,@NO)]; [subscriber sendNext:RACTuplePack(@YES,@NO)]; [subscriber sendCompleted]; return nil ; }]; RACSignal *orSignal = [sourceSignal or]; [orSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }];
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 - (RACSignal *)or { return [[self map:^(RACTuple *tuple) { NSCAssert ([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@" , tuple); NSCAssert (tuple.count > 0 , @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple" ); return @([tuple.rac_sequence any:^(NSNumber *number) { NSCAssert ([number isKindOfClass:NSNumber .class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@" , tuple); return number.boolValue; }]); }] setNameWithFormat:@"[%@] -or" , self .name]; }
这里主要是将 RACTuple 转换成 RACTupleSequence,因为 RACTupleSequence 是继承 RACSequence,这里就会调用 RACSequence 的 - (BOOL)any:(BOOL (^)(id))block
方法
1 2 3 4 5 6 7 8 - (BOOL )any:(BOOL (^)(id ))block { NSCParameterAssert (block != NULL ); return [self objectPassingTest:block] != nil ; }
1 2 3 4 5 6 7 8 - (id )objectPassingTest:(BOOL (^)(id ))block { NSCParameterAssert (block != NULL ); return [self filter:block].head; }
- (BOOL)any:(BOOL (^)(id))block
实现里面最终会执行 RACStream 中的 filter
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (__kindof RACStream *)filter:(BOOL (^)(id value))block { NSCParameterAssert (block != nil ); Class class = self .class; return [[self flattenMap:^ id (id value) { if (block(value)) { return [class return :value]; } else { return class .empty; } }] setNameWithFormat:@"[%@] -filter:" , self .name]; }
在 flattenMapBlock 中会以 RACTupleSequence 发出的信号值传入到 block中,这些值信号值也就是一开始测试代码中 sourceSignal 发送给订阅者的 RACTuple 中的元素,如果value对应的BOOL值是YES,就转换成一个 RACTupleSequence 信号。如果对应的是NO,则转换成一个empty信号。
上面的 flattenMap
方法实际上最终调用到 RACSequence 的 -bind
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (RACSequence *)bind:(RACSequenceBindBlock (^)(void ))block { RACSequenceBindBlock bindBlock = block(); return [[self bind:bindBlock passingThroughValuesFromSequence:nil ] setNameWithFormat:@"[%@] -bind:" , self .name]; }
然后继续调用 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence
这里参数 passthroughSequence 为 nil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence { __block RACSequence *valuesSeq = self ; __block RACSequence *current = passthroughSequence; __block BOOL stop = NO ; RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id { while (current.head == nil ) { if (stop) return nil ; id value = valuesSeq.head; if (value == nil ) { stop = YES ; return nil ; } current = (id )bindBlock(value, &stop); if (current == nil ) { stop = YES ; return nil ; } valuesSeq = valuesSeq.tail; } NSCAssert ([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@" , current); return nil ; } headBlock:^(id _) { return current.head; } tailBlock:^ id (id _) { if (stop) return nil ; return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail]; }]; sequence.name = self .name; return sequence; }
上面代码的关键逻辑是调用 RACDynamicSequence 类的 sequenceWithLazyDependency
方法,dependencyBlock 会在之前提及到的函数 -objectPassingTest
中,执行 RACSequence 的 -head
方法中触发
在 方法,dependencyBlock 中,执行current = (id)bindBlock(value, &stop);
根据value的布尔值来产生新的信号,如果为 NO 则返回 RACEmptySequence 类型,此时 current.head 为 nil,进行下一次循环;如过 value 为YES,则返回 RACUnarySequence 类型,current.head 不为 nil,结束循环。
any: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (void )testAny { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal any:^BOOL (id _Nullable object) { return [object integerValue] < 2 ; }] subscribeNext:^(NSNumber * _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 value = 1 value = 0 value = 0
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (RACSignal *)any:(BOOL (^)(id object))predicateBlock { NSCParameterAssert (predicateBlock != NULL ); return [[[self materialize] bind:^{ return ^(RACEvent *event, BOOL *stop) { if (event.finished) { *stop = YES ; return [RACSignal return :@NO]; } if (predicateBlock(event.value)) { *stop = YES ; return [RACSignal return :@YES]; } return [RACSignal empty]; }; }] setNameWithFormat:@"[%@] -any:" , self .name]; }
首先原信号会 通过 -materialize
方法转换成 RACEvent 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 - (RACSignal *)materialize { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { return [self subscribeNext:^(id x) { [subscriber sendNext:[RACEvent eventWithValue:x]]; } error:^(NSError *error) { [subscriber sendNext:[RACEvent eventWithError:error]]; [subscriber sendCompleted]; } completed:^{ [subscriber sendNext:RACEvent.completedEvent]; [subscriber sendCompleted]; }]; }] setNameWithFormat:@"[%@] -materialize" , self .name]; }
1 2 3 - (BOOL )isFinished { return self .eventType == RACEventTypeCompleted || self .eventType == RACEventTypeError; }
从上面代码发现,RACEvent 如果收到原信号的 sendCompleted / sendError,finished 属性会置为 YES
在 -any
方法中,先判断 RACEvent 的 finished 属性,如果为 YES,stop接下来的信号 则返回 [RACSignal return:@NO];反之,则会根据入参 predicateBlock 闭包,将 RACEvent 的 value 传入 predicateBlock,如果返回值为YES,stop接下来的信号,则返回 [RACSignal return:@YES];如果 predicateBlock 返回值为 NO,则返回 [RACSignal empty]。
简单地总结来说,any 方法根据 predicateBlock 来对原信号的每一个信号进行判断,若遇到返回 YES 的条件,就给订阅者发送 YES 信号,然后发送 sendCompleted;若 predicateBlock 没有返回 YES 的条件,则最后给 订阅者 发送 NO 信号。
all 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 - (RACSignal *)all:(BOOL (^)(id object))predicateBlock { NSCParameterAssert (predicateBlock != NULL ); return [[[self materialize] bind:^{ return ^(RACEvent *event, BOOL *stop) { if (event.eventType == RACEventTypeCompleted) { *stop = YES ; return [RACSignal return :@YES]; } if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) { *stop = YES ; return [RACSignal return :@NO]; } return [RACSignal empty]; }; }] setNameWithFormat:@"[%@] -all:" , self .name]; }
all 方法可以理解为 any 方法的对立面,原信号如果发送 sendError 或者 predicateBlock 返回为 NO,就会结束信号的传递,并会给订阅者发值为 NO 的信号。如果整个订阅过程中都没有出现错误以及都满足 predicateBlock 为真的条件,最后会在 RACEventTypeCompleted 的时候发送 YES。
repeat 测试代码:
1 2 3 4 5 6 7 8 9 10 11 RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *orSignal = [sourceSignal repeat]; [orSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }];
输出:
1 2 3 4 value = 1 value = 1 value = 1 ...
执行之后会不断地打印 value = 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - (RACSignal *)repeat { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { return subscribeForever(self , ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) { }); }] setNameWithFormat:@"[%@] -repeat" , self .name]; }
执行 -repeat
方法之后,内部会创建新的 RASignal,当新的信号被订阅的时候会执行 subscribeForever
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id ), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { next = [next copy ]; error = [error copy ]; completed = [completed copy ]; RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable]; RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void )) { RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable]; [compoundDisposable addDisposable:selfDisposable]; __weak RACDisposable *weakSelfDisposable = selfDisposable; RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) { @autoreleasepool { error(e, compoundDisposable); [compoundDisposable removeDisposable:weakSelfDisposable]; } recurse(); } completed:^{ @autoreleasepool { completed(compoundDisposable); [compoundDisposable removeDisposable:weakSelfDisposable]; } recurse(); }]; [selfDisposable addDisposable:subscriptionDisposable]; }; recursiveBlock(^{ RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler]; RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock]; [compoundDisposable addDisposable:schedulingDisposable]; }); return compoundDisposable; }
subscribeForever
方法有4个参数,分别是源信号 signal,next 闭包,error 闭包和 complete 闭包。函数内部先定义好递归的闭包:recursiveBlock,recursiveBlock 中 首先对 signal 进行订阅,如果源信号 signal 发送 sendError 或者 sendCompleted,就会执行对应的 error/complete 闭包,然后就执行 recursiveBlock 的参数闭包 recurse。
subscribeForever
最后是执行 recursiveBlock 并传入具体的 recurse。
recurse 中首先获取当前的递归调度器 recursiveScheduler,然后执行 -scheduleRecursiveBlock
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 - (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; [self scheduleRecursiveBlock:[recursiveBlock copy ] addingToDisposable:disposable]; return disposable; } - (void )scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable { @autoreleasepool { RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable]; [disposable addDisposable:selfDisposable]; __weak RACDisposable *weakSelfDisposable = selfDisposable; RACDisposable *schedulingDisposable = [self schedule:^{ @autoreleasepool { [disposable removeDisposable:weakSelfDisposable]; } if (disposable.disposed) return ; void (^reallyReschedule)(void ) = ^{ if (disposable.disposed) return ; [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]; }; __block NSLock *lock = [[NSLock alloc] init]; lock.name = [NSString stringWithFormat:@"%@ %s" , self , sel_getName(_cmd)]; __block NSUInteger rescheduleCount = 0 ; __block BOOL rescheduleImmediately = NO ; @autoreleasepool { recursiveBlock(^{ [lock lock]; BOOL immediate = rescheduleImmediately; if (!immediate) ++rescheduleCount; [lock unlock]; if (immediate) reallyReschedule(); }); } [lock lock]; NSUInteger synchronousCount = rescheduleCount; rescheduleImmediately = YES ; [lock unlock]; for (NSUInteger i = 0 ; i < synchronousCount; i++) { reallyReschedule(); } }]; [selfDisposable addDisposable:schedulingDisposable]; } } - (RACDisposable *)schedule:(void (^)(void ))block { NSCParameterAssert (block != NULL ); RACDisposable *disposable = [[RACDisposable alloc] init]; dispatch_async (self .queue, ^{ if (disposable.disposed) return ; [self performAsCurrentScheduler:block]; }); return disposable; }
可以看到会调用到自己的 -schedule
方法,这里测试代码中是在主线程,获取到对应的是 RACTargetQueueScheduler 。这里的 -schedule
方法先判断原信号有没有disposed,若果没有,则把参数 block 放在对应的队列中触发。
可以看得到上面函数中 scheduleBlock 里不断递归执行 [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]
,recursiveBlock 不断被触发,对应的 subscribeForever
中的 recurse() 循环执行:
scheduleRecursiveBlock->recursiveBlock->recurse()->reallyReschedule()->scheduleRecursiveBlock
也就是说源信号会循环被订阅触发其给订阅者发送 sendNext 事件,直到源信号发送 error 才结束。
retry: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 - (void )testRetry { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendError:[NSError errorWithDomain:@"domain" code:-1 userInfo:nil ]]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *retrySignal = [sourceSignal retry:2 ]; [retrySignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 4 5 6 value = 1 value = 2 value = 1 value = 2 value = 1 value = 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 - (RACSignal *)retry:(NSInteger )retryCount { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { __block NSInteger currentRetryCount = 0 ; return subscribeForever(self , ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { if (retryCount == 0 || currentRetryCount < retryCount) { currentRetryCount++; return ; } [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) { [disposable dispose]; [subscriber sendCompleted]; }); }] setNameWithFormat:@"[%@] -retry: %lu" , self .name, (unsigned long )retryCount]; }
-retry:
实现与 -repeat
实现类似,基于 subscribeForever
。-retry:
是内部维护一个 currentRetryCount 变量,当原始信号发送 sendError 时判断重试次数 currentRetryCount 是否小于 retryCount,若是则重试,如果重试依旧收到 sendError,超过 retryCount 之后就会停止重试。
如果原信号没有发生错误,那么原信号在发送结束,当原信号发送 sendCompleted,subscribeForever
也就接受了,所以 -retry:
操作对于没有任何error的信号 和 直接订阅原信号表现一样。
将测试代码改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 - (void )testRetryNoError { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *retrySignal = [sourceSignal retry:2 ]; [retrySignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
原信号发送 sendCompleted ,整个订阅流程就结束了。