RACSignal常用方法深入分析(2)

本篇文章接着上篇继续分析常用的 RACSignal 的方法

flatten

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- (void)testFlatten {
RACSignal *sourceSignal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
return nil;
}];
sourceSignal1.name = @"sourceSignal1";

RACSignal *sourceSignal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
sourceSignal2.name = @"sourceSignal2";

RACSignal *sourceSignal3 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:sourceSignal1];
[subscriber sendNext:sourceSignal2];
[subscriber sendCompleted];
return nil;
}];
sourceSignal3.name = @"sourceSignal3";

[[sourceSignal3 flatten] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 0
value = 1
value = 2
value = 3

flatten 从字面看是压扁、扁平的意思,而对于 RACSignal 来说,可以理解为 解包、降阶 的意思。

底层实现:

1
2
3
4
5
- (__kindof RACStream *)flatten {
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}

flatten 方法内部是通过封装 flattenMap 实现,flattenMap 的实现在之前的文章中做过分析,可以看得出,执行 flatten 方法的原信号发送的信号必须是 RACStream 类型,否则会崩溃。原型号每次发送 sendNext,发送的信号(这里是RACSignal)会被订阅,然后又会触发发送的信号的sendNext,将信号值发送给最终订阅者,最终对原信号发送的信号进行解包、降阶。

image-20190201093344155

flatten:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
- (void)testFlatten {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal3 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:signal0];
[subscriber sendNext:signal1];
[subscriber sendNext:signal2];
[subscriber sendNext:signal3];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal flatten:2] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 0
value = 1
value = 3
value = 2

-flatten:-flatten 类似,都是对高阶信号进行降阶、解包操作,但是前者多一个入参 maxConcurrent 来控制同一时间对高阶信号发出的子信号的订阅动作。

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];

// Contains disposables for the currently active subscriptions.
//
// This should only be used while synchronized on `subscriber`.
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];

// Whether the signal-of-signals has completed yet.
//
// This should only be used while synchronized on `subscriber`.
__block BOOL selfCompleted = NO;

// Subscribes to the given signal.
__block void (^subscribeToSignal)(RACSignal *);

// Weak reference to the above, to avoid a leak.
__weak __block void (^recur)(RACSignal *);

// Sends completed to the subscriber if all signals are finished.
//
// This should only be used while synchronized on `subscriber`.
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
}
};

// The signals waiting to be started.
//
// This array should only be used while synchronized on `subscriber`.
NSMutableArray *queuedSignals = [NSMutableArray array];

recur = subscribeToSignal = ^(RACSignal *signal) {
/// STEP 3
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}

serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;

@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];

if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}

nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}

subscribeToSignal(nextSignal);
}];
};

[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
/// STEP 1
if (signal == nil) return;

NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);

@synchronized (subscriber) {
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];

// If we need to wait, skip subscribing to this
// signal.
return;
}
}

subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
/// STEP 2
@synchronized (subscriber) {
selfCompleted = YES;
completeIfAllowed();
}
}]];

[compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
// A strong reference is held to `subscribeToSignal` until we're
// done, preventing it from deallocating early.
subscribeToSignal = nil;
}]];

return compoundDisposable;
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}

在订阅原信号的之前,先定义了几个相关变量:

  • activeDisposables:可变数组,保存 原信号发送的 RACSignal 信号 被订阅时候返回的 RACDisposable 对象
  • selfCompleted: 判断原信号是否已经发送过 sendCompleted
  • subscribeToSignal闭包:订阅 RACSignal 信号,并对RACSignal进行相关处理。
  • resur闭包:对 subscribeToSignal 闭包的一个弱引用,避免之前定义 subscribeToSignal 的内部引起循环引用
  • completeIfAllowed闭包:当所有高阶信号都发送了 sendCompleted 或原信号发送了 sendCompleted 会执行,目的是给订阅者发送 sendCompleted
  • queuedSignals:可变数组,缓存原信号发出的且未被订阅的 RACSignal 信号

了解完 flatten: 内部变量的含义之后,接下来具体分析一下此方法的流程:

  1. 订阅原信号,然后收到原信号的 sendNext 事件,如果信号值为nil,直接return;如果不为nil,则需要判断是否属于 RACSignal。如果当前最大可订阅的信号数量 maxConcurrent > 0,而且正在被订阅且订阅未完成的子信号数量 activeDisposables.count >= maxConcurren,则把子信号加入缓存数组 queuedSignals 中。如果数量条件不满足,传入子信号 signal 执行 subscribeToSignal 闭包。
  2. 若原信号发送了 sendCompleted,selfCompleted = YES,执行 completeIfAllowed 闭包,如果当前没有正在出于订阅过程的子信号,则给订阅者发送 sendCompleted
  3. 当执行 subscribeToSignal 闭包的时候订阅闭包传入的参数 signal,然后进行一系列处理:
    • 把订阅 signal 的 Disposable 加入 activeDisposables 数组中
    • 当 signal 发送 sendCompleted 时,先定义内部闭包变量 subscribeToSignal,需要注意的是此闭包变量和订阅原信号之前定义的 subscribeToSignal 闭包是同一个,但是前者时通过指向 recur(__weak修饰) 来避循环引用,基本原理和我们平时经常使用的 weak-strong-dance 基本一致。然后通过对 subscriber 加锁进行原子性操作,先从 activeDisposables 移除 signal 对应的 Disposable。然后判断缓存队列中是否还有未被订阅的子信号,若没有,执行completeIfAllowed;若有,取出队列中第一个元素,重复步骤3,也就是重新触发 subscribeToSignal。
  4. 当整个原信号的订阅流程结束,也就是给订阅者发送 sendCompleted 之后,原信号对应的 RACDisposable 中会将 subscribeToSignal 闭包赋值为nil,避免循环引用

总结来说,原信号每次发送一个子信号,先判断 activeDisposables 保存的元素个数是否超过 maxConcurrent,若超过则把子信号保存在 queuedSignals 数组中进行缓存;反之调用 subscribeToSignal 闭包,并传入当前的子信号。

每当子信号发送 sendCompleted,就会从 queuedSignals 数组中取出缓存的子信号然后进行订阅;如果 queuedSignals 元素数目为0,则结束原信号的订阅。

flatten: 函数主要是围绕的参数 maxConcurrent 做处理,在订阅原信号的可以看到其中的判断 maxConcurrent > 0 && activeDisposables.count >= maxConcurrent ,不同 maxConcurrent 值决定了 flatten: 的不同表现:

  1. maxConcurrent <= 0,表现和上面分析的 flatten 相同。

  2. maxConcurrent == 1,具体表现和之前文章分析的 concat 一致。

    image-20190204172830751

  3. maxConcurrent > 1,当 activeDisposables.count < maxConcurrent,activeDisposables.count > 1表现和 -flatten 类似,activeDisposables.count == 1 则跟 -concat 类似;当 activeDisposables.count >= maxConcurrent,子信号被保存在 queuedSignals 数组中;若

+merge:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
- (void)testMerge {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal3 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *mergeSignal = [RACSignal merge:@[signal0, signal1, signal2, signal3]];

[mergeSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 0
value = 3
value = 1
value = 2

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}

return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}

[subscriber sendCompleted];
return nil;
}]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}

+merge: 方法的入参时一个实现 NSFastEnumeration 协议的容器对象,NSFastEnumeration 协议主要作用是使对象快速枚举的能力。

  1. 函数内部会将容器内部所有的 RACSignal 对象加入到 copiedSignals 数组中,然后返回新的被flatten解包过的信号
  2. 当新信号被订阅的时候,就给会将保存在 copiedSignals 数组中 RACSignal 逐一发送给订阅者,之后的流程就进入 -flatten 函数的流程,这里不再重复

zip:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (void)testZip {
RACSignal *signal0 = [RACSignal return:@0];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

RACSignal *zigSignal = [RACStream zip:@[signal0, signal1, signal2]];

[zigSignal subscribeNext:^(id x) {
NSLog(@"value = %@\n----------\n",x);
}];

[zigSignal subscribeCompleted:^{
NSLog(@"zigSignal sendCompleted");
}];
}

输出:

1
2
3
4
5
6
7
8
value = <RACTuple: 0x600000a0c810> (
0,
1,
3
)
----------

zigSignal sendCompleted

底层实现:

1
2
3
4
5
+ (__kindof RACStream *)zip:(id<NSFastEnumeration>)streams {
return [[self join:streams block:^(RACStream *left, RACStream *right) {
return [left zipWith:right];
}] setNameWithFormat:@"+zip: %@", streams];
}

zip: 方法是通过封装 +join:block: 来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
+ (__kindof RACStream *)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;

// Creates streams of successively larger tuples by combining the input
// streams one-by-one.
for (RACStream *stream in streams) {
// For the first stream, just wrap its values in a RACTuple. That way,
// if only one stream is given, the result is still a stream of tuples.
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];

continue;
}

current = block(current, stream);
}

if (current == nil) return [self empty];

return [current map:^(RACTuple *xs) {
// Right now, each value is contained in its own tuple, sorta like:
//
// (((1), 2), 3)
//
// We need to unwrap all the layers and create a tuple out of the result.
NSMutableArray *values = [[NSMutableArray alloc] init];

while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
}

return [RACTuple tupleWithObjectsFromArray:values];
}];
}

+join:block: 第一个参数是一个可以进行枚举 RACStream 容器,也就是测试代码中传入的数组 @[signal0, signal1, signal2],然后一次进行下面的处理:

  1. 首先对传入数组进行遍历并执行对应信号的 -map 方法返回新的信号赋值给 current 变量,旨在把信号值转化 RACTuple 类型。第一个信号流打包成一个元组,这个元组里面就一个信号,然后把第一个元组和第二个信号通过参数 block 进行处理,block 内部是通过 -zipWith: 打包成新的元祖’,里面装着是第一个元组和第二个信号,然后循环此操作直至退出遍历

  2. 如果 current == nil 则直接返回空信号

  3. 对 current 进行执行 -map: 方法,主要作用是将步骤1中嵌套的元祖’(类似于(((1), 2), 3)) 取消嵌套层次,如(1, 2, 3)。需要注意的是 current 当前的结果是 RACTuple(RACTuple, value),也就是说在 -map: 参数闭包触发的时候,闭包入参 xs 是一个 RACTwoTuple 类型对象,第一个元素是 RACTuple,第二个元素是一个值。在 while 循环中,先获取 last 元素,也就是值类型的元素,如果为nil,则以 RACTupleNil.tupleNil 替代插入数组的第0位,然后 xs 指向自己的 first 元素(RACTuple类型),然后进入下一次循环,从而消除嵌套,并不改变 value 值得顺序保持到 values 数组中。

  4. 最后执行 [RACTuple tupleWithObjectsFromArray:values] 将数组转换成 RACTuple 类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    + (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
    return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
    }

    + (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
    if (!convert) {
    return [[self alloc] initWithBackingArray:array];
    }

    NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
    for (id object in array) {
    [newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
    }

    return [[self alloc] initWithBackingArray:newArray];
    }

总结:zip: 函数主要是将多个信号发送的值”同步”地压缩打包成 RACTuple 发送给订阅者。这里”同步”是指压缩的时候需要等最迟发送的信号发送 sendNext 之后再发送给最终的订阅者,这里主要是依靠 -zipWith: 函数实现,在之前的文章已经进行过分析,这里就不再重复。

combineLatestWith:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (void)testConbindLastWith {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

RACSignal *combineSig = [signal0 combineLatestWith:signal1];
[combineSig subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
6
7
8
value = <RACTwoTuple: 0x600001aad2e0> (
2,
3
)
value = <RACTwoTuple: 0x600001ab6590> (
2,
4
)

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- (RACSignal *)combineLatestWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

__block id lastSelfValue = nil;
__block BOOL selfCompleted = NO;

__block id lastOtherValue = nil;
__block BOOL otherCompleted = NO;

void (^sendNext)(void) = ^{
@synchronized (disposable) {
if (lastSelfValue == nil || lastOtherValue == nil) return;
[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
}
};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (disposable) {
lastSelfValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
selfCompleted = YES;
if (otherCompleted) [subscriber sendCompleted];
}
}];

[disposable addDisposable:selfDisposable];

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (disposable) {
lastOtherValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
otherCompleted = YES;
if (selfCompleted) [subscriber sendCompleted];
}
}];

[disposable addDisposable:otherDisposable];

return disposable;
}] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
}

主要流程:

  1. 定义 lastSelfValue 和 selfCompleted,前者判断原信号是否发送过 sendNext 事件,后者判断 原信号是否发送过 sendCompleted;同理 lastOtherValue 和 otherCompleted 是分别用来判断参数信号 signal 的是否发送过 sendNext 和 sendCompleted;
  2. 定义 sendNext 闭包,当原信号或者参数信号 signal 发送 sendNext 事件的时候,会执行这个闭包
  3. 订阅原信号,当原信号发送 sendNext,将信号值赋值给 lastSelfValue,若信号值为空则将 RACTupleNil.tupleNil 赋值给 lastSelfValue。执行 sendNext(),判断原信号和参数信号 signal 是否都执行过 sendNext,如果是则将 lastSelfValue 和 lastOtherValue 包装的 RACTuple 发给订阅者;否则就直接return。
  4. 若原信号发送 sendCompleted,则判断另一个信号(这里相对于原信号是参数信号 signal)是否已经发送过 sendCompleted,若是则给订阅者发送 sendCompleted 并结束整个订阅;如果原信号发送 sendError 则直接给订阅者发送 error 并结束整个订阅。
  5. 订阅参数型号 signal,针对 signal 重复步骤2-4。

总结:combineLatestWith: 作用是收到原信号和参数信号的最新信号(必须是2个信号都收到)值打包成元组数据并发送给订阅者,需要注意的是当分别收到过这俩个信号发送的信号值之后,不管原信号还是参数信号,以后每次发送 sendNext,都会将最新的2个信号值打包成元组数据发送给订阅者。

image-20190207223805751

combineLatest:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (void)testCombineLatest {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

RACSignal *combineSignal = [RACSignal combineLatest:@[signal0, signal1]];
[combineSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
6
7
8
value = <RACTuple: 0x600001aae1a0> (
2,
3
)
value = <RACTuple: 0x600001ab9760> (
2,
4
)

底层实现:

1
2
3
4
5
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
return [left combineLatestWith:right];
}] setNameWithFormat:@"+combineLatest: %@", signals];
}

combineLatest 是由 -join:block:-combineLatestWith: 组合而成。主要分为以下几个步骤:

  1. 传入 RACStream 信号的容器,遍历元素,调用 -combineLatestWith: ,将最终产生新的信号发送的值 value 包装成嵌套的 RACTuple(RACTuple, value) 类似结构。
  2. 把步骤1的嵌套 RACTuple 进行解嵌套成类似 RACTuple(value0,value1,value2) 的结构发给订阅者

combineLatest: 可以基本步骤和 zip: 方法类似,主要区别可以简单的概括为:

  • combineLatest: 如果传入的所有元素信号都发送过信号,其中一个元素信号发送了新的信号值的时候,就会把每一个元素信号发送过的最新信号值一起打包成 RACTuple 发给订阅者
  • zip: 如果传入的所有元素信号都发送过信号,当所有元素信号发送了新的信号值的时候把最新信号值一起打包成 RACTuple 发给订阅者

combineLatest:reduce:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- (void)testCombineLatestReduce {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *combineSignal = [RACSignal combineLatest:@[signal0, signal1] reduce:^id _Nonnull (NSNumber *num1, NSNumber *num2) {
return @(num1.integerValue * 10 + num2.integerValue);
}];

[combineSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 12
value = 13

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(RACGenericReduceBlock)reduceBlock {
NSCParameterAssert(reduceBlock != nil);

RACSignal *result = [self combineLatest:signals];

// Although we assert this condition above, older versions of this method
// supported this argument being nil. Avoid crashing Release builds of
// apps that depended on that.
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];

return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
}

+combineLatest:reduce: 的实现和 +zip:reduce: 相似,先将调用 +combineLastest: 并传入参数 signals 返回新的 RACSignal 对象 result,result 执行 -reduceEach: 方法,将发送的 RACTuple 信号值根据 reduceBlock 闭包进行转换后再转发给最终的订阅者。

scanWithStart:reduceWithIndex:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (void)testScanWithStartReduceWithIndex {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *scanSignal = [signal0 scanWithStart:@10 reduceWithIndex:^id _Nullable(NSNumber * _Nullable running, NSNumber * _Nullable next, NSUInteger index) {
return @(running.integerValue + next.integerValue + index*10);
}];

[scanSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 11 // 10 + 1 + 0 * 10
value = 23 // 11 + 2 + 1 * 10
value = 46 // 23 + 3 + 2 * 10

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
NSCParameterAssert(reduceBlock != nil);

Class class = self.class;

return [[self bind:^{
__block id running = startingValue;
__block NSUInteger index = 0;

return ^(id value, BOOL *stop) {
running = reduceBlock(running, value, index++);
return [class return:running];
};
}] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, RACDescription(startingValue)];
}

-scanWithStart:reduceWithIndex: 内部是通过 -bind: 方法实现的,主要流程:

  1. 断言判断传入 reduceBlock 是否为空,定义 -bind 入参闭包内部变量 running 和 index,running 的初始化值为函数入参 startingValue,index 的初始化值为0。
  2. 原信号发送信号值 value,执行函数入参 reduceBlock 闭包,并传入 running, value, index,执行完,running 会被重新赋值成 reduceBlock 的返回值且 index 加一。
  3. 调用原信号所属类的 +return 方法,返回新的信号,剩下的流程和之前文章介绍的 -bind: 方法一致,不再重复。

image-20190209201928966

scanWithStart:reduce:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (void)testScanWithStartReduce {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@10];
[subscriber sendNext:@100];
[subscriber sendCompleted];
return nil;
}];

RACSignal *scanSignal = [signal0 scanWithStart:@100 reduce:^id _Nullable(NSNumber * _Nullable previous, NSNumber * _Nullable current) {
return @(previous.integerValue + current.integerValue);
}];

[scanSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 101
value = 111
value = 211

底层实现:

1
2
3
4
5
6
7
8
9
10
- (__kindof RACStream *)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {
NSCParameterAssert(reduceBlock != nil);

return [[self
scanWithStart:startingValue
reduceWithIndex:^(id running, id next, NSUInteger index) {
return reduceBlock(running, next);
}]
setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, RACDescription(startingValue)];
}

内部是通过封装 -scanWithStart:reduceWithIndex: 来实现,主要区别在于 scanWithStart:reduce 的入参 reduceBlock 不传入 index,其他流程一致。

combinePreviousWithStart:reduce:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (void)testCombinePreviousWithStartReduceBlock {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *combineSignal = [signal0 combinePreviousWithStart:@100 reduce:^id _Nullable(NSNumber * _Nullable previous, NSNumber * _Nullable current) {
return @(previous.integerValue + current.integerValue);
}];

[combineSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 101
value = 3
value = 5

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
- (__kindof RACStream *)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
NSCParameterAssert(reduceBlock != NULL);
return [[[self
scanWithStart:RACTuplePack(start)
reduce:^(RACTuple *previousTuple, id next) {
id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);
}]
map:^(RACTuple *tuple) {
return tuple[1];
}]
setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, RACDescription(start)];
}

该方法是通过封装 -scanWithStart:reduce:-map 实现,基于这个基础,-combinePreviousWithStart:reduce: 大致要经过这样的流程:

  1. 将入参 start 包装成 RACTuple 传入给 -scanWithStart:reduce: 并在其闭包中第一次拿到的 previousTuple 就是 RACTuple(start),此时的 next 就是原信号的第一个信号值(测试代码中的@1)。然后执行 -combinePreviousWithStart:reduce: 入参 reduceBlock,传入 next 和 previousTuple[0],返回值赋值给 value,最后将 next 和 value 包装成 RACTuple 返回(需要注意这里next是保存在RACTuple的第0位元素)。后续原信号每发一次新信号值都会重复这个步骤。
  2. 步骤1返回的新信号进行 map 操作,取出步骤1的 reduceBlock 处理返回的value 发给订阅者

    -combinePreviousWithStart:reduce: 是对俩俩原信号的原始信号值进行处理; -scanWithStart:reduce: 俩俩原始信号值处理结果会和下一个新的原始信号值进行下一轮的处理。

sample:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- (void)testSample {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendCompleted];
});
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal0 disposed");
}];
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@"A"];
[subscriber sendNext:@"B"];
[subscriber sendNext:@"C"];
[subscriber sendCompleted];
});
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 disposed");
}];
}];

RACSignal *sampleSignal = [signal0 sample:signal1];

[sampleSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
value = 3
value = 3
value = 3
signal1 disposed
signal0 disposed

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
- (RACSignal *)sample:(RACSignal *)sampler {
NSCParameterAssert(sampler != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSLock *lock = [[NSLock alloc] init];
__block id lastValue;
__block BOOL hasValue = NO;

RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[lock lock];
hasValue = YES;
lastValue = x;
[lock unlock];
} error:^(NSError *error) {
[samplerDisposable dispose];
[subscriber sendError:error];
} completed:^{
[samplerDisposable dispose];
[subscriber sendCompleted];
}];

samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
BOOL shouldSend = NO;
id value;
[lock lock];
shouldSend = hasValue;
value = lastValue;
[lock unlock];

if (shouldSend) {
[subscriber sendNext:value];
}
} error:^(NSError *error) {
[sourceDisposable dispose];
[subscriber sendError:error];
} completed:^{
[sourceDisposable dispose];
[subscriber sendCompleted];
}];

return [RACDisposable disposableWithBlock:^{
[samplerDisposable dispose];
[sourceDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
}

实现逻辑和之前的 -combineLatestWith: 有点异曲同工,主要为以下几个步骤:

  1. 定义变量 lastValue 用来记录原信号最后此发送的信号值,hasValue 判断原信号是否已经发送过信号值,lock 用于保证 lastValue/hasValue 的线程安全
  2. 订阅原信号,当其发送 sendNext 的时候,lastValue 赋值为发送的信号值,shouldSend = YES
  3. 当原信号发送 sendCompleted/sendError,先结束 sampler 的订阅再给订阅者发送sendCompleted/sendError
  4. 订阅 sampler 信号,当其发送 sendNext 的时候,如果 shouldSend == YES,则将 lastValue 发送给最终的订阅者
  5. 当 sampler 信号发送 sendCompleted/sendError,先结束原信号的订阅再给订阅者发送sendCompleted/sendError

image-20190209230053730