RACSignal-Bind操作过程解析

在 RACSignals.h 头文件中定义 RACSignal 几个基本操作方法:

  • bind
  • concat
  • zipWith

在分析一下 bind 操作流程,首先看一下 bind 函数的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* 代码 1 */

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// BLOCK 1

/// 1.保存这个block,signal1.didSubscribe = block

/// 5.sendNext,把value传给第3步的nextBlock,并执行nextBlock(value)
[subscriber sendNext:@(1)];
[subscriber sendNext:@(2)];
[subscriber sendNext:@(3)];

[subscriber sendCompleted];
return nil;
}];

RACSignal *bindSignal = [sourceSignal bind:^RACSignalBindBlock _Nonnull{
// BLOCK 2
return ^RACSignal *(id value, BOOL *stop) {
// BLOCK 3
value = @([value integerValue] + 1);
return [RACSignal return:value];
};
}];

[bindSignal subscribeNext:^(id _Nullable x) {
// BLOCK 4

/// 2.内部调用RACDynamicSignal subscribe
/// 3.创建subscriber, 保存subscriber->nextBlock
/// 4.执行didSubscribe, subscriber执行sendNext
NSLog(@"%@", x);
}];
}

执行代码,在控制台会依次输出 1, 2, 3

从输出结果可以猜测 bind 方法作用大概是将原信号 sourceSignal 发送的信号事件经过 bind 方法的参数 block 的加工之后,给 bind 函数返回的 bindSignal 的订阅者发送新的信号事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 代码 2 */

/*
* -bind: should:
*
* 1. Subscribe to the original signal of values.
* 2. Any time the original signal sends a value, transform it using the binding block.
* 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
* 4. If the binding block asks the bind to terminate, complete the _original_ signal.
* 5. When _all_ signals complete, send completed to the subscriber.
*
* If any signal sends an error at any point, send that to the subscriber.
*/

从函数内部的注释中,可以初步地看到 bind 函数的作用:

  1. 订阅原始信号
  2. 当原信号发送任何值的时候,都会通过 bind 方法参数 block 来进行转换
  3. 如果 block 返回一个 signal,则马上对其进行订阅,然后把值发送给订阅者 subscriber
  4. 如果 block 终止绑定,结束原始信号
  5. 所有信号都结束后,给订阅者 subscriber 发送 completed 信号
  6. 在以上过程中,其中一个 signal 发送 error,则会马上将 error 转发给订阅者 subscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/* 代码 3 */

- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
// BLOCK 5

NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
// BLOCK 6

RACSignalBindBlock bindingBlock = block();

__block volatile int32_t signalCount = 1; // indicates self

RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
// BLOCK 7

if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
[compoundDisposable removeDisposable:finishedDisposable];
}
};

void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
// BLOCK 8

OSAtomicIncrement32Barrier(&signalCount);

RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

RACDisposable *disposable = [signal subscribeNext:^(id x) {
// BLOCK 9

[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(selfDisposable);
}
}];

selfDisposable.disposable = disposable;
};

@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// BLOCK 10

// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;

BOOL stop = NO;
id signal = bindingBlock(x, &stop);

@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(selfDisposable);
}
}];

selfDisposable.disposable = bindingDisposable;
}

return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}

从代码 1开始看,主要流程可以分为以下几步:

  1. 先创建原始信号 sourceSignal,执行 createSignal: 方法的时候 , 会将 BLOCK 1 保存起来
  2. 执行 bind: 方法,进入 BLOCK 5 先判断参数是否为空
  3. 创建并返回新的信号,也就是 代码 1 中的 bindSignal,bindSignal 被订阅时,会触发 BLOCK 6
  4. 执行 bind:方法的入参 block,返回 RACSignalBindBlock 类型对象 bindBlock,也就是执行 BLOCK 2
  5. 定义好 completeSignaladdSignal 后,开始订阅自己,也就是 sourceSignal 本身,这时候会触发步骤1保存的 BLOCK 1 ,sourceSignal 保存的 didSubscribe 会被执行,执行 sendNext: 方法,触发 subscriber 的 nextBlock ,也就是执行 BLOCK 10
  6. 先判断绑定是否被 dispose,然后执行步骤4中的 bindBlock,入参是 BLOCK 1 发送的值 value ,分别是 @(1)@(2)@(3),然后按照 BLOCK 3 逻辑进行转换,也就是 代码 1 中将 value 值加1之后,调用 retrun: 返回将 value 包装成 RACReturnSignal 对象返回,这里暂且将其称为 signal_t
  7. 调用 addSignal 闭包,将步骤6返回的 signal_t 作为闭包参数,通过 OSAtomicDecrement32Barrier 方法,将 signalCount + 1,然后对 signal_t 进行订阅,因为 signal_t 是 RACReturnSignal 类型,被订阅之后马上会执行 BLOCK 9,这里会执行 subscriber sendNext 方法,subscriber是 BLOCK 6 入参,最终会执行 BLOCK 4
  8. 如果绑定被终止或者 signal_t 为空,则执行 completeSignal 闭包,也就是执行 BLOCK 7,给订阅者 subscriber 发送 completed 信号
  9. 执行完 BLOCK 7 或 源信号发送 complete/error 之后,就完成一次 bind 的基本流程

bind 操作的基本流程大概可以用下图来表示:

image-20190112202901811