RACSignal常用方法深入分析(终)

本篇文章接着上篇继续分析常用的 RACSignal 方法的第四部分进行分析。

switchToLatest

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
- (void)testSwitchToLast {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@6];
[subscriber sendCompleted];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:signal0];
[subscriber sendNext:signal1];
[subscriber sendNext:signal2];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal switchToLatest] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 0
value = 1
value = 3
value = 6

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACMulticastConnection *connection = [self publish];

RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);

// -concat:[RACSignal never] prevents completion of the receiver from
// prematurely terminating the inner signal.
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];

RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
  1. -switchToLatest 内部原信号调用 -publish 转化为热信号,返回的 connection 会持有热信号,也就是其 signal 属性
  2. 对热信号执行 -flattenMap: 方法,通过断言判断热信号发送的信号值是否也是 RACSignal,也就是说 -switchToLatest 方法是处理高阶信号的
  3. flattenMap 内部会对热信号执行 concat: 方法,这里的传入参数为 [RACSignal never],RACSignal x 执行 takeUntil:, 作用是为了防止热信号发送的 RACSignal 过早结束导致整个订阅都被结束

image-20190301161002720

switch: cases: default:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
- (void)testswitchCasesDefault {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"00"];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"11"];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"22"];
[subscriber sendCompleted];
return nil;
}];

RACSignal *defaultSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1024];
[subscriber sendCompleted];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"0"];
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendCompleted];
return nil;
}];

NSDictionary *dict = @{@"0" : signal0,
@"1" : signal1,
@"2" : signal2
};

[[RACSignal switch:sourceSignal cases:dict default:defaultSignal] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 00
value = 11
value = 22

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
NSCParameterAssert(signal != nil);
NSCParameterAssert(cases != nil);

for (id key in cases) {
id value __attribute__((unused)) = cases[key];
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
}

NSDictionary *copy = [cases copy];

return [[[signal
map:^(id key) {
if (key == nil) key = RACTupleNil.tupleNil;

RACSignal *signal = copy[key] ?: defaultSignal;
if (signal == nil) {
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
}

return signal;
}]
switchToLatest]
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}
  1. -switch:case:default: 首先对参数 signal、case 做非空判断,然后遍历字典 case 并判断每一个 value 是否为 RACSignal 对象
  2. 订阅原信号进行 map 操作,根据 key 来从 case 里面取出对应的 RACSignal 并返回,如果取出来的是 nil,则取 defaultSignal,若 defaultSignal 为nil,则返回 error signal,这样原信号就被转化高阶信号,然后再进行 switchToLatest 操作,把最终的信号值发给订阅者

if: then: else:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (void)testIfThenElse {
RACSignal *signalTrue = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"true"];
return nil;
}];

RACSignal *signalFalse = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"false"];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@(NO)];
[subscriber sendNext:@(YES)];
[subscriber sendCompleted];
return nil;
}];

[[RACSignal if:sourceSignal then:signalTrue else:signalFalse] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];;
}

输出:

1
2
value = false
value = true

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
NSCParameterAssert(boolSignal != nil);
NSCParameterAssert(trueSignal != nil);
NSCParameterAssert(falseSignal != nil);

return [[[boolSignal
map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);

return (value.boolValue ? trueSignal : falseSignal);
}]
switchToLatest]
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}

+if: then: else:+switch: cases: default: 原理类似,把原信号发送的布尔值通过 map 操作转化对应的 RACSignal 返回。YES->trueSignal, NO->falseSignal,原信号被转化成高阶信号,再执行 switchToLatest ,后面逻辑与 +switch: cases: default: 一样

catch:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)testCatch {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSError *error = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
[subscriber sendError:error];
return nil;
}];

RACSignal *catchSignal = [sourceSignal catch:^RACSignal * _Nonnull(NSError * _Nonnull error) {
NSLog(@"excute catch block, error = %@", error);
return [RACSignal return:@"error text"];
}];

[catchSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
excute catch block, error = Error Domain= Code=-1 "(null)"
value = error text

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
NSCParameterAssert(catchBlock != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
RACSignal *signal = catchBlock(error);
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
catchDisposable.disposable = [signal subscribe:subscriber];
} completed:^{
[subscriber sendCompleted];
}];

return [RACDisposable disposableWithBlock:^{
[catchDisposable dispose];
[subscriptionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -catch:", self.name];
}

当原信号发送 sendError 的时候把 error 传入 catchBlock 闭包并返回 RACSignal,通过断言判断返回的 signal 是否为空,然后再订阅 signal,把 signal 的信号发给最终的订阅者

try:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- (void)testTry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"source signal dispose");
}];
}];

RACSignal *trySignal = [sourceSignal try:^BOOL(id _Nullable value, NSError * _Nullable __autoreleasing * _Nullable errorPtr) {
NSInteger i = [value integerValue];
if (i > 2) {
*errorPtr = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
return NO;
}

return YES;
}];

[trySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

[trySignal subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}

输出:

1
2
3
4
5
6
value = 0
value = 1
value = 2
source signal dispose
error = Error Domain= Code=-1 "(null)"
source signal dispose

底层实现:

1
2
3
4
5
6
7
8
9
- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
NSCParameterAssert(tryBlock != NULL);

return [[self flattenMap:^(id value) {
NSError *error = nil;
BOOL passed = tryBlock(value, &error);
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
}] setNameWithFormat:@"[%@] -try:", self.name];
}
  1. tryBlock 根据原信号发送的信号值返回 passed 和 error(error 是以指针地址的形式传入,tryBlock 内部对其进行赋值)
  2. 如果 passed 为 YES,则将原信号发送的 value 包装成 RACSignal;若 passed 为 NO,则返回 RACErrorSignal ,通过这2包装,原信号会被包装成一个高阶信号

firstOrDefault: success: error:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)testFirstOrDefaultSuccessError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

BOOL success;
NSError *error;
id value = [sourceSignal firstOrDefault:@10 success:&success error:&error];

NSLog(@"value = %@|thread=%@", value, [NSThread currentThread]);
}

输出:

1
value = 0|thread=<NSThread: 0x608000075600>{number = 1, name = main}

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCondition *condition = [[NSCondition alloc] init];
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];

__block id value = defaultValue;
__block BOOL done = NO;

// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess;

[[self take:1] subscribeNext:^(id x) {
[condition lock];

value = x;
localSuccess = YES;

done = YES;
[condition broadcast];
[condition unlock];
} error:^(NSError *e) {
[condition lock];

if (!done) {
localSuccess = NO;
localError = e;

done = YES;
[condition broadcast];
}

[condition unlock];
} completed:^{
[condition lock];

localSuccess = YES;

done = YES;
[condition broadcast];
[condition unlock];
}];

[condition lock];
while (!done) {
[condition wait];
}

if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;

[condition unlock];
return value;
}
  1. 定义好状态相关的变量
    • value: 记录原信号发送的值
    • done:判断原信号是否发送了 sendCompleted / sendError / sendNext
    • localError:记录原信号发送的 NSError
    • localSuccess:记录原信号是否发送过 sendCompleted / sendNext
    • condition :同步用的锁
  2. 订阅原信号并通过 take: 方法只获取第一个信号值
    • sendNext:condition 先加锁,记录 value,如果为nil,则保持为 defaultValue,condition 调用 broadcast 通知所有等待的线程
    • sendCompleted:和 sendNext 类似
    • sendError:如果之前没有发送过 sendNext,则会保存 error,然后 condition 调用 broadcast 通知所有等待的线程
  3. 订阅完原信号之后,会马上对 condition 进行加锁,一直等待原信号发送信号值并会阻塞当前线程

-firstOrDefault: success: error: 用法有点类似 python 中 await,有点像协程的概念

要注意的是如果执行该方法的线程和原信号发送 sendCompleted / sendError / sendNext 是统一线程,很容易会造成死锁,因为函数内部会先对 condition 加锁一次(步骤3),然后收到原信号发送的信号值的时候幽会对 condition 加锁一次,这时候如果在统一线程会造成死锁

waitUntilCompleted:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (void)testWaitUntilCompleted {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

NSError *error;
BOOL success = [sourceSignal waitUntilCompleted:&error];
NSLog(@"success = %@", @(success));
}

输出:

1
success = 1

底层实现:

1
2
3
4
5
6
7
8
9
10
- (BOOL)waitUntilCompleted:(NSError **)error {
BOOL success = NO;

[[[self
ignoreValues]
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
firstOrDefault:nil success:&success error:error];

return success;
}
  1. 先对原信号执行 ignoreValues 方法,过滤掉所有 sendNext 事件
  2. 执行 firstOrDefault: success: error: ,若原信号发送 sendCompleted,则返回 YES,若发送 sendError 则返回 NO

defer:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (void)testDefer {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

[[RACSignal defer:^RACSignal * _Nonnull{
NSLog(@"execute defer block");
return sourceSignal;
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
execute defer block
value = 0

底层实现:

1
2
3
4
5
6
7
+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}

+defer: 首先创建返回新的信号,然后执行 block,并订阅 block 返回的信号;+defer: 主要作用是延迟订阅,也就是在订阅前进行相关自定义的操作

initially:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (void)testInitially {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

RACSignal *initialSignal = [sourceSignal initially:^{
NSLog(@"execute initial block");
}];

[initialSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
execute initial block
value = 0

底层实现:

1
2
3
4
5
6
7
8
- (RACSignal *)initially:(void (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal defer:^{
block();
return self;
}] setNameWithFormat:@"[%@] -initially:", self.name];
}

-initially: 是通过封装 defer 实现的,在 defer 的 block 中先执行block,在返回自己,也就是订阅原信号之前先进行 block 中的相关操作

deliverOn:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (void)testDeliverOn {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendCompleted];
});
return nil;
}];

[[sourceSignal deliverOn:[RACScheduler mainThreadScheduler]] subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];

[sourceSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];
}

输出:

1
2
value = 0|thread = <NSThread: 0x600000a77240>{number = 3, name = (null)}
value = 0|thread = <NSThread: 0x60c0000718c0>{number = 1, name = main}

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[scheduler schedule:^{
[subscriber sendNext:x];
}];
} error:^(NSError *error) {
[scheduler schedule:^{
[subscriber sendError:error];
}];
} completed:^{
[scheduler schedule:^{
[subscriber sendCompleted];
}];
}];
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}

-deliverOn: 当原信号发送 sendNext/sendError/sendCompleted 的时候,会根据传入的调度器 scheduler ,在对应的线程中把信号发给订阅者;换句话来说,就是通过传入 scheduler 指定对应线程接受信号

subscribeOn:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

RACDisposable *schedulingDisposable = [scheduler schedule:^{
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];

[disposable addDisposable:subscriptionDisposable];
}];

[disposable addDisposable:schedulingDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}

-subscribeOn:-deliverOn: 类似,主要区别在于 -subscribeOn: 是指定订阅动作所在的线程,也就是执行 didSubscribe 闭包的线程被指定,但是 sendNext/sendError/sendCompleted 所在线程是不确定的

deliverOnMainThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
- (RACSignal *)deliverOnMainThread {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block volatile int32_t queueLength = 0;

void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
int32_t queued = OSAtomicIncrement32(&queueLength);
if (NSThread.isMainThread && queued == 1) {
block();
OSAtomicDecrement32(&queueLength);
} else {
dispatch_async(dispatch_get_main_queue(), ^{
block();
OSAtomicDecrement32(&queueLength);
});
}
};

return [self subscribeNext:^(id x) {
performOnMainThread(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
performOnMainThread(^{
[subscriber sendError:error];
});
} completed:^{
performOnMainThread(^{
[subscriber sendCompleted];
});
}];
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}

-deliverOnMainThread-deliverOn 作用类似,前者是将 sendNext/sendError/sendCompleted 都放在主线程上执行

OSAtomicIncrement32 方法是对某个值进行自增计算并且是线程安全,如果当前线程是主线程并且 OSAtomicIncrement32 函数返回值是 1,说明当前主线程没有待执行的 sendNext/sendError/sendCompleted 事件,这时候会直接运行block,反之则把任务放在主线程异步执行