RACSignal-concat与zipWith过程解析

bind、concat、zipWith 都属于 RACSignal 基本操作方法,前一篇文章已经介绍过 bind,接下来再来分析 concat 和 zipWith

concat

首先来看 concat 方法,同样先测试 concat 执行效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* 代码 1 */

RACSignal *signal1 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
// BLOCK 1
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 dispose");
}];
}];


RACSignal *signal2 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
// BLOCK 2
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendNext:@6];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal2 dispose");
}];
}];

RACSignal *concatSignal = [signal1 concat:signal2];

[concatSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];

执行代码,可以从看到控制输出大概如下:

1
2
3
4
5
6
7
8
subscribe value = 1
subscribe value = 2
subscribe value = 3
subscribe value = 4
subscribe value = 5
subscribe value = 6
signal2 dispose
signal1 dispose

从输出接口,可以大致猜测 concat 是将2个信号进行合并,然后依次发送给最终的的订阅者

concat 的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 代码 2 */

- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];

// STEP 1
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
// STEP 2
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
// STEP 3
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
[compoundDisposable addDisposable:concattedDisposable];
}];

[compoundDisposable addDisposable:sourceDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

分析源码,concat 方法主要分为以下几个步骤:

  1. 先订阅自己,也就是 代码 1 中的 signal1,
  2. signal1 被订阅之后就执行 signal1 的 didSubscribe,给最终的订阅者 subscriber 发送 sendNext、sendError
  3. signal1 发送完sendCompleted 信号之后,开始订阅 signal2,signal2 发送信号的逻辑与步骤1-2一样

从上面的分析,注意到:

  • signal2 需要在 signal1 发送 sendCompleted 之后才开始被订阅,所以如果在 代码 1 中 signal1 如果不发送 sendCompleted,控制台就不会输出 4,5,6
  • compoundDisposable 是先把第二个信号订阅返回的 concattedDisposable 加入数组中,然后再把 signal1 订阅返回的 sourceDisposable 加入数组,所以在 代码 1 中 signal2先结束,然后 signal1 再结束。当2个信号都结束之后,concat之后的新信号也就结束

concat 流程可以用下图概括:

image-20190113190846381

zipWith

同样先测试 zipWith 方法的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* 代码 3 */

RACSignal *signal1 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 dispose");
}];
}];


RACSignal *signal2 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendNext:@6];
[subscriber sendNext:@7];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal2 dispose");
}];
}];

RACSignal *zipSignal = [signal1 zipWith:signal2];

[zipSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];

执行并输出

1
2
3
4
5
6
7
8
9
10
11
12
subscribe value = <RACTwoTuple: 0x60800001b5f0> (
1,
4
)
subscribe value = <RACTwoTuple: 0x60800001b680> (
2,
5
)
subscribe value = <RACTwoTuple: 0x60800001b660> (
3,
6
)

看起来 zipWith 是把2个信号发送值合并成 tuple 类型并发送给订阅者

具体源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/* 代码 4 */

- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
// BLOCK 1
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];

__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];

void (^sendCompletedIfNecessary)(void) = ^{
// BLOCK 2
@synchronized (selfValues) {
// 判断是否有其中一个信号已经完成并且数组元素数量为空,则整个信号就算完成
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};

void (^sendNext)(void) = ^{
// BLOCK 3
@synchronized (selfValues) {
// 其中任意一个信号发送sendNext就会进入执行这里的代码
// 先判断2个数组是否不为空
// 取出2个数组的头元素,包装成 RACTuple 发送给订阅者
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;

RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];

[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
// BLOCK 4
@synchronized (selfValues) {
// 把信号事件加到对应的数组里
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 结束的时候判断整个信号是否结束
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
// BLOCK 5
@synchronized (selfValues) {
// 把信号事件加到对应的数组里
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 结束的时候判断整个信号是否结束
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];

return [RACDisposable disposableWithBlock:^{
// 销毁2个信号
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}

zipWith 操作主要经历以下几步:

  1. 初始化2个可变数组,分别保存 signal1 和 signal2 发送的信号事件
  2. 订阅 signal1,然后会执行 BLOCK 4 把值保存到数组中,再去执行之前初始化好的 sendNext 闭包,然后判断2个数组是否都有值,如都有则把2个数组的头元素组出来,包装成 RACTuple 类型发给订阅者,当然 BLOCK 4 中 signal1 被订阅的时候都没有发出去,因为 otherValues 为空
  3. 订阅 signal2,具体逻辑跟步骤2相似,signal2被订阅之后会执行 BLOCK 5 ,然后发送的值存在对应数组,然后执行 sendNext 闭包,这时候2个数组不为空,给订阅者发送 RACTuple,然后判断 signal1和 signal2 其中是否有一个已经完成并且数组为空,则给订阅者发送 sendCompleted

第一个信号依次发送的1,2,3的值和第二个信号依次发送的4,5,6,7的值,一一合并,当signal2 发送 7 的时候 signal1 已经结束了,所以 7 也就是被丢弃了。

image-20190113194035889

RACDisposable

每次对一个 RACSignal 进行订阅执行 subscribeNext 方法的时候,都会返回 一个 RACDisposable 对象,之前也提到过 RACDisposable 主要对一次订阅结束过后做相关的清理操作,而且它可以随时取消任何一个订阅,这方面类似 NSURLSessionDataTask

RACDisposable 有一个核心方法 -dispose ,功能类似于我们经常接触到的 -dealloc 方法,接下来分析 RACDisposable 实现代码,看其是如何工作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 代码 5 */

- (instancetype)init {
self = [super init];

_disposeBlock = (__bridge void *)self;
OSMemoryBarrier();

return self;
}

- (instancetype)initWithBlock:(void (^)(void))block {
NSCParameterAssert(block != nil);

self = [super init];

_disposeBlock = (void *)CFBridgingRetain([block copy]);
OSMemoryBarrier();

return self;
}

+ (instancetype)disposableWithBlock:(void (^)(void))block {
return [[self alloc] initWithBlock:block];
}

这里看到有2个初始化方法 initinitWithBlock,后置是把参数 block 拷贝并以 void * 的形式保存到 _disposeBlock 中;而 init_disposeBlock 指向的是 self,这里指向 self 的作用是什么呢?

1
2
3
- (BOOL)isDisposed {
return _disposeBlock == NULL;
}

我们可以看到 isDisposed 的返回结果是通过判断 _disposeBlock 是否为空,这里 init_disposeBlock 指向的是 self主要为了在不引入其他实例变量、增加对象的基础上避免 isDisposed 方法的误判

基于上面的结论,很可能会产生另外一个疑惑,为什么不把 init 方法禁用,只能通 initWithBlock 方法进行初始化?这是因为对于某些 RACDisposable 的子类如 RACSerialDisposable,执行多余的 _disposeBlock 比较浪费性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)dispose {
void (^disposeBlock)(void) = NULL;

while (YES) {
void *blockPtr = _disposeBlock;
if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
if (blockPtr != (__bridge void *)self) {
disposeBlock = CFBridgingRelease(blockPtr);
}

break;
}
}

if (disposeBlock != nil) disposeBlock();
}

-dispose 主要是将 _disposeBlock 进行一次 release,然后执行这个block

RACSerialDisposable

RACSerialDisposable 是一个容量为1的 RACDisposable 容器,RACDisposable 对象保存在 _disposable 属性中

1
2
3
- (void)setDisposable:(RACDisposable *)disposable {
[self swapInDisposable:disposable];
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (RACDisposable *)swapInDisposable:(RACDisposable *)newDisposable {
RACDisposable *existingDisposable;
BOOL alreadyDisposed;

pthread_mutex_lock(&_mutex);
alreadyDisposed = _disposed;
if (!alreadyDisposed) {
existingDisposable = _disposable;
_disposable = newDisposable;
}
pthread_mutex_unlock(&_mutex);

if (alreadyDisposed) {
[newDisposable dispose];
return nil;
}

return existingDisposable;
}

设置 _disposable 的时候采用 pthred_mutex_t 互斥锁来保证操作的原子性,先判断是否已经被 dispose,若没有则将 newDisposable 赋值给 _disposable,反之直接对新的 newDisposable 执行 dispose

1
2
3
4
5
6
7
8
9
10
11
12
13
- (void)dispose {
RACDisposable *existingDisposable;

pthread_mutex_lock(&_mutex);
if (!_disposed) {
existingDisposable = _disposable;
_disposed = YES;
_disposable = nil;
}
pthread_mutex_unlock(&_mutex);

[existingDisposable dispose];
}

RACSerialDisposable 的 dispose 方法内部是在线程安全的基础上将 _disposable 换出,然后再进行 dispose

RACCompoundDisposable

RACCompoundDisposable 也是保存 RACDisposable 的容器,和 RACSerialDisposable 不同的是,RACCompoundDisposable可以保存多个 RACDisposable 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@interface RACCompoundDisposable () {
// Used for synchronization.
pthread_mutex_t _mutex;

#if RACCompoundDisposableInlineCount
// A fast array to the first N of the receiver's disposables.
//
// Once this is full, `_disposables` will be created and used for additional
// disposables.
//
// This array should only be manipulated while _mutex is held.
RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount];
#endif

// Contains the receiver's disposables.
//
// This array should only be manipulated while _mutex is held. If
// `_disposed` is YES, this may be NULL.
CFMutableArrayRef _disposables;

// Whether the receiver has already been disposed.
//
// This ivar should only be accessed while _mutex is held.
BOOL _disposed;
}

RACCompoundDisposable 内部用2个数组来保存 RACDisposable,分别是 _inlineDisposables_disposables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];

#if RACCompoundDisposableInlineCount
[otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
self->_inlineDisposables[index] = disposable;

// Stop after this iteration if we've reached the end of the inlined
// array.
if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
}];
#endif

if (otherDisposables.count > RACCompoundDisposableInlineCount) {
_disposables = RACCreateDisposablesArray();

CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
}

return self;
}

当数量少于等于 RACCompoundDisposableInlineCount(默认为2) 的时候会保存在 _inlineDisposables 数组里面,超过这个临界值后,后面的 RACDisposable 保存在 _disposables 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
- (void)addDisposable:(RACDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;

BOOL shouldDispose = NO;

pthread_mutex_lock(&_mutex);
{
if (_disposed) {
shouldDispose = YES;
} else {
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == nil) {
_inlineDisposables[i] = disposable;
goto foundSlot;
}
}
#endif

if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
CFArrayAppendValue(_disposables, (__bridge void *)disposable);

if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {
RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
}

#if RACCompoundDisposableInlineCount
foundSlot:;
#endif
}
}
pthread_mutex_unlock(&_mutex);

// Performed outside of the lock in case the compound disposable is used
// recursively.
if (shouldDispose) [disposable dispose];
}

添加 RACDisposable 的时候同样先判断当前是否已经被 dispose,若是则直接执行参数 RACDisposable 对象的 dispose;反之则先判断 _inlineDisposables 是否有剩余空间,如果没有就加入到 _disposables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
- (void)dispose {
#if RACCompoundDisposableInlineCount
RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
#endif

CFArrayRef remainingDisposables = NULL;

pthread_mutex_lock(&_mutex);
{
_disposed = YES;

#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
inlineCopy[i] = _inlineDisposables[i];
_inlineDisposables[i] = nil;
}
#endif

remainingDisposables = _disposables;
_disposables = NULL;
}
pthread_mutex_unlock(&_mutex);

#if RACCompoundDisposableInlineCount
// Dispose outside of the lock in case the compound disposable is used
// recursively.
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
[inlineCopy[i] dispose];
}
#endif

if (remainingDisposables == NULL) return;

CFIndex count = CFArrayGetCount(remainingDisposables);
CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
CFRelease(remainingDisposables);
}

-dispose 中,_inlineDisposables 和 _disposables 遍历,然后逐一进行 dispose