本篇文章接着上篇 继续分析常用的 RACSignal 方法的第四部分进行分析。
switchToLatest 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 - (void )testSwitchToLast { RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendNext:@2 ]; [subscriber sendCompleted]; }); return nil ; }]; RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@3 ]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_main_queue(), ^{ [subscriber sendNext:@4 ]; [subscriber sendNext:@5 ]; [subscriber sendCompleted]; }); return nil ; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@6 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:signal0]; [subscriber sendNext:signal1]; [subscriber sendNext:signal2]; [subscriber sendCompleted]; return nil ; }]; [[sourceSignal switchToLatest] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 4 value = 0 value = 1 value = 3 value = 6
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 - (RACSignal *)switchToLatest { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACMulticastConnection *connection = [self publish]; RACDisposable *subscriptionDisposable = [[connection.signal flattenMap:^(RACSignal *x) { NSCAssert (x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@" , self , x); return [x takeUntil:[connection.signal concat:[RACSignal never]]]; }] subscribe:subscriber]; RACDisposable *connectionDisposable = [connection connect]; return [RACDisposable disposableWithBlock:^{ [subscriptionDisposable dispose]; [connectionDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -switchToLatest" , self .name]; }
-switchToLatest
内部原信号调用 -publish
转化为热信号,返回的 connection 会持有热信号,也就是其 signal 属性
对热信号执行 -flattenMap:
方法,通过断言判断热信号发送的信号值是否也是 RACSignal,也就是说 -switchToLatest
方法是处理高阶信号的
flattenMap 内部会对热信号执行 concat: 方法,这里的传入参数为 [RACSignal never],RACSignal x 执行 takeUntil:
, 作用是为了防止热信号发送的 RACSignal 过早结束导致整个订阅都被结束
switch: cases: default: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 - (void )testswitchCasesDefault { RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"00" ]; return nil ; }]; RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"11" ]; return nil ; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"22" ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *defaultSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@1024 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"0" ]; [subscriber sendNext:@"1" ]; [subscriber sendNext:@"2" ]; [subscriber sendCompleted]; return nil ; }]; NSDictionary *dict = @{@"0" : signal0, @"1" : signal1, @"2" : signal2 }; [[RACSignal switch :sourceSignal cases:dict default :defaultSignal] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 3 value = 00 value = 11 value = 22
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 + (RACSignal *)switch :(RACSignal *)signal cases:(NSDictionary *)cases default :(RACSignal *)defaultSignal { NSCParameterAssert (signal != nil ); NSCParameterAssert (cases != nil ); for (id key in cases) { id value __attribute__((unused)) = cases[key]; NSCAssert ([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't" , value); } NSDictionary *copy = [cases copy ]; return [[[signal map:^(id key) { if (key == nil ) key = RACTupleNil.tupleNil; RACSignal *signal = copy [key] ?: defaultSignal; if (signal == nil ) { NSString *description = [NSString stringWithFormat:NSLocalizedString (@"No matching signal found for value %@" , @"" ), key]; return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey : description }]]; } return signal; }] switchToLatest] setNameWithFormat:@"+switch: %@ cases: %@ default: %@" , signal, cases, defaultSignal]; }
-switch:case:default:
首先对参数 signal、case 做非空判断,然后遍历字典 case 并判断每一个 value 是否为 RACSignal 对象
订阅原信号进行 map 操作,根据 key 来从 case 里面取出对应的 RACSignal 并返回,如果取出来的是 nil,则取 defaultSignal,若 defaultSignal 为nil,则返回 error signal,这样原信号就被转化高阶信号,然后再进行 switchToLatest 操作,把最终的信号值发给订阅者
if: then: else: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - (void )testIfThenElse { RACSignal *signalTrue = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"true" ]; return nil ; }]; RACSignal *signalFalse = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@"false" ]; return nil ; }]; RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@(NO )]; [subscriber sendNext:@(YES )]; [subscriber sendCompleted]; return nil ; }]; [[RACSignal if :sourceSignal then:signalTrue else :signalFalse] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }];; }
输出:
1 2 value = false value = true
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 + (RACSignal *)if :(RACSignal *)boolSignal then:(RACSignal *)trueSignal else :(RACSignal *)falseSignal { NSCParameterAssert (boolSignal != nil ); NSCParameterAssert (trueSignal != nil ); NSCParameterAssert (falseSignal != nil ); return [[[boolSignal map:^(NSNumber *value) { NSCAssert ([value isKindOfClass:NSNumber .class], @"Expected %@ to send BOOLs, not %@" , boolSignal, value); return (value.boolValue ? trueSignal : falseSignal); }] switchToLatest] setNameWithFormat:@"+if: %@ then: %@ else: %@" , boolSignal, trueSignal, falseSignal]; }
+if: then: else:
与 +switch: cases: default:
原理类似,把原信号发送的布尔值通过 map 操作转化对应的 RACSignal 返回。YES->trueSignal, NO->falseSignal,原信号被转化成高阶信号,再执行 switchToLatest ,后面逻辑与 +switch: cases: default:
一样
catch: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (void )testCatch { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { NSError *error = [NSError errorWithDomain:@"" code:-1 userInfo:nil ]; [subscriber sendError:error]; return nil ; }]; RACSignal *catchSignal = [sourceSignal catch:^RACSignal * _Nonnull(NSError * _Nonnull error) { NSLog (@"excute catch block, error = %@" , error); return [RACSignal return :@"error text" ]; }]; [catchSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 excute catch block, error = Error Domain= Code=-1 "(null)" value = error text
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 - (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock { NSCParameterAssert (catchBlock != NULL ); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init]; RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { RACSignal *signal = catchBlock(error); NSCAssert (signal != nil , @"Expected non-nil signal from catch block on %@" , self ); catchDisposable.disposable = [signal subscribe:subscriber]; } completed:^{ [subscriber sendCompleted]; }]; return [RACDisposable disposableWithBlock:^{ [catchDisposable dispose]; [subscriptionDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -catch:" , self .name]; }
当原信号发送 sendError 的时候把 error 传入 catchBlock 闭包并返回 RACSignal,通过断言判断返回的 signal 是否为空,然后再订阅 signal,把 signal 的信号发给最终的订阅者
try: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 - (void )testTry { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendNext:@2 ]; [subscriber sendNext:@3 ]; [subscriber sendCompleted]; return [RACDisposable disposableWithBlock:^{ NSLog (@"source signal dispose" ); }]; }]; RACSignal *trySignal = [sourceSignal try:^BOOL (id _Nullable value, NSError * _Nullable __autoreleasing * _Nullable errorPtr) { NSInteger i = [value integerValue]; if (i > 2 ) { *errorPtr = [NSError errorWithDomain:@"" code:-1 userInfo:nil ]; return NO ; } return YES ; }]; [trySignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; [trySignal subscribeError:^(NSError * _Nullable error) { NSLog (@"error = %@" , error); }]; }
输出:
1 2 3 4 5 6 value = 0 value = 1 value = 2 source signal dispose error = Error Domain= Code=-1 "(null)" source signal dispose
底层实现:
1 2 3 4 5 6 7 8 9 - (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock { NSCParameterAssert (tryBlock != NULL ); return [[self flattenMap:^(id value) { NSError *error = nil ; BOOL passed = tryBlock(value, &error); return (passed ? [RACSignal return :value] : [RACSignal error:error]); }] setNameWithFormat:@"[%@] -try:" , self .name]; }
tryBlock 根据原信号发送的信号值返回 passed 和 error(error 是以指针地址的形式传入,tryBlock 内部对其进行赋值)
如果 passed 为 YES,则将原信号发送的 value 包装成 RACSignal;若 passed 为 NO,则返回 RACErrorSignal ,通过这2包装,原信号会被包装成一个高阶信号
firstOrDefault: success: error: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - (void )testFirstOrDefaultSuccessError { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_global_queue(0 , 0 ), ^{ [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendCompleted]; }); return nil ; }]; BOOL success; NSError *error; id value = [sourceSignal firstOrDefault:@10 success:&success error:&error]; NSLog (@"value = %@|thread=%@" , value, [NSThread currentThread]); }
输出:
1 value = 0|thread=<NSThread: 0x608000075600>{number = 1, name = main}
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 - (id )firstOrDefault:(id )defaultValue success:(BOOL *)success error:(NSError **)error { NSCondition *condition = [[NSCondition alloc] init]; condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:" , self .name, defaultValue]; __block id value = defaultValue; __block BOOL done = NO ; __block NSError *localError; __block BOOL localSuccess; [[self take:1 ] subscribeNext:^(id x) { [condition lock]; value = x; localSuccess = YES ; done = YES ; [condition broadcast]; [condition unlock]; } error:^(NSError *e) { [condition lock]; if (!done) { localSuccess = NO ; localError = e; done = YES ; [condition broadcast]; } [condition unlock]; } completed:^{ [condition lock]; localSuccess = YES ; done = YES ; [condition broadcast]; [condition unlock]; }]; [condition lock]; while (!done) { [condition wait]; } if (success != NULL ) *success = localSuccess; if (error != NULL ) *error = localError; [condition unlock]; return value; }
定义好状态相关的变量
value: 记录原信号发送的值
done:判断原信号是否发送了 sendCompleted / sendError / sendNext
localError:记录原信号发送的 NSError
localSuccess:记录原信号是否发送过 sendCompleted / sendNext
condition :同步用的锁
订阅原信号并通过 take: 方法只获取第一个信号值
sendNext:condition 先加锁,记录 value,如果为nil,则保持为 defaultValue,condition 调用 broadcast 通知所有等待的线程
sendCompleted:和 sendNext 类似
sendError:如果之前没有发送过 sendNext,则会保存 error,然后 condition 调用 broadcast 通知所有等待的线程
订阅完原信号之后,会马上对 condition 进行加锁,一直等待原信号发送信号值并会阻塞当前线程
-firstOrDefault: success: error:
用法有点类似 python 中 await
,有点像协程的概念
要注意的是如果执行该方法的线程和原信号发送 sendCompleted / sendError / sendNext 是统一线程,很容易会造成死锁,因为函数内部会先对 condition 加锁一次(步骤3),然后收到原信号发送的信号值的时候幽会对 condition 加锁一次,这时候如果在统一线程会造成死锁
waitUntilCompleted: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (void )testWaitUntilCompleted { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC )), dispatch_get_global_queue(0 , 0 ), ^{ [subscriber sendNext:@0 ]; [subscriber sendNext:@1 ]; [subscriber sendCompleted]; }); return nil ; }]; NSError *error; BOOL success = [sourceSignal waitUntilCompleted:&error]; NSLog (@"success = %@" , @(success)); }
输出:
底层实现:
1 2 3 4 5 6 7 8 9 10 - (BOOL )waitUntilCompleted:(NSError **)error { BOOL success = NO ; [[[self ignoreValues] setNameWithFormat:@"[%@] -waitUntilCompleted:" , self .name] firstOrDefault:nil success:&success error:error]; return success; }
先对原信号执行 ignoreValues 方法,过滤掉所有 sendNext 事件
执行 firstOrDefault: success: error: ,若原信号发送 sendCompleted,则返回 YES,若发送 sendError 则返回 NO
defer: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (void )testDefer { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendCompleted]; return nil ; }]; [[RACSignal defer:^RACSignal * _Nonnull{ NSLog (@"execute defer block" ); return sourceSignal; }] subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 execute defer block value = 0
底层实现:
1 2 3 4 5 6 7 + (RACSignal *)defer:(RACSignal<id > * (^)(void ))block { NSCParameterAssert (block != NULL ); return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { return [block() subscribe:subscriber]; }] setNameWithFormat:@"+defer:" ]; }
+defer:
首先创建返回新的信号,然后执行 block,并订阅 block 返回的信号;+defer:
主要作用是延迟订阅,也就是在订阅前进行相关自定义的操作
initially: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 - (void )testInitially { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { [subscriber sendNext:@0 ]; [subscriber sendCompleted]; return nil ; }]; RACSignal *initialSignal = [sourceSignal initially:^{ NSLog (@"execute initial block" ); }]; [initialSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = %@" , x); }]; }
输出:
1 2 execute initial block value = 0
底层实现:
1 2 3 4 5 6 7 8 - (RACSignal *)initially:(void (^)(void ))block { NSCParameterAssert (block != NULL ); return [[RACSignal defer:^{ block(); return self ; }] setNameWithFormat:@"[%@] -initially:" , self .name]; }
-initially:
是通过封装 defer 实现的,在 defer 的 block 中先执行block,在返回自己,也就是订阅原信号之前先进行 block 中的相关操作
deliverOn: 测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 - (void )testDeliverOn { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id <RACSubscriber> subscriber) { dispatch_async (dispatch_get_global_queue(0 , 0 ), ^{ [subscriber sendNext:@0 ]; [subscriber sendCompleted]; }); return nil ; }]; [[sourceSignal deliverOn:[RACScheduler mainThreadScheduler]] subscribeNext:^(id _Nullable x) { NSLog (@"value = x|thread = %@" , [NSThread currentThread]); }]; [sourceSignal subscribeNext:^(id _Nullable x) { NSLog (@"value = x|thread = %@" , [NSThread currentThread]); }]; }
输出:
1 2 value = 0|thread = <NSThread: 0x600000a77240>{number = 3, name = (null)} value = 0|thread = <NSThread: 0x60c0000718c0>{number = 1, name = main}
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 - (RACSignal *)deliverOn:(RACScheduler *)scheduler { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { return [self subscribeNext:^(id x) { [scheduler schedule:^{ [subscriber sendNext:x]; }]; } error:^(NSError *error) { [scheduler schedule:^{ [subscriber sendError:error]; }]; } completed:^{ [scheduler schedule:^{ [subscriber sendCompleted]; }]; }]; }] setNameWithFormat:@"[%@] -deliverOn: %@" , self .name, scheduler]; }
-deliverOn:
当原信号发送 sendNext/sendError/sendCompleted 的时候,会根据传入的调度器 scheduler ,在对应的线程中把信号发给订阅者;换句话来说,就是通过传入 scheduler 指定对应线程接受信号
subscribeOn: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 - (RACSignal *)subscribeOn:(RACScheduler *)scheduler { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; RACDisposable *schedulingDisposable = [scheduler schedule:^{ RACDisposable *subscriptionDisposable = [self subscribe:subscriber]; [disposable addDisposable:subscriptionDisposable]; }]; [disposable addDisposable:schedulingDisposable]; return disposable; }] setNameWithFormat:@"[%@] -subscribeOn: %@" , self .name, scheduler]; }
-subscribeOn:
和 -deliverOn:
类似,主要区别在于 -subscribeOn:
是指定订阅动作所在的线程,也就是执行 didSubscribe 闭包的线程被指定,但是 sendNext/sendError/sendCompleted 所在线程是不确定的
deliverOnMainThread 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 - (RACSignal *)deliverOnMainThread { return [[RACSignal createSignal:^(id <RACSubscriber> subscriber) { __block volatile int32_t queueLength = 0 ; void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) { int32_t queued = OSAtomicIncrement32(&queueLength); if (NSThread .isMainThread && queued == 1 ) { block(); OSAtomicDecrement32(&queueLength); } else { dispatch_async (dispatch_get_main_queue(), ^{ block(); OSAtomicDecrement32(&queueLength); }); } }; return [self subscribeNext:^(id x) { performOnMainThread(^{ [subscriber sendNext:x]; }); } error:^(NSError *error) { performOnMainThread(^{ [subscriber sendError:error]; }); } completed:^{ performOnMainThread(^{ [subscriber sendCompleted]; }); }]; }] setNameWithFormat:@"[%@] -deliverOnMainThread" , self .name]; }
-deliverOnMainThread
和 -deliverOn
作用类似,前者是将 sendNext/sendError/sendCompleted 都放在主线程上执行
OSAtomicIncrement32 方法是对某个值进行自增计算并且是线程安全,如果当前线程是主线程并且 OSAtomicIncrement32 函数返回值是 1,说明当前主线程没有待执行的 sendNext/sendError/sendCompleted 事件,这时候会直接运行block,反之则把任务放在主线程异步执行