RACSignal常用方法深入分析(1)

ReactiveCocoa 框架 RACSignal+Operations.h 定义了 RACSignal 常规操作方法,接下来对一些常用的方法进行分析并解析其作用。

doNext

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (void)testDoNext {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"source signal"];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal doNext:^(id _Nullable x) {
NSLog(@"sourceSignal doNext will execute before sendNext");
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
sourceSignal doNext will execute before sendNext
value = source signal

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (RACSignal *)doNext:(void (^)(id x))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
block(x);
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doNext:", self.name];
}

doNext 传入block 闭包,该闭包的参数就是原信号发送的值,当给订阅者发送 sendNext 之前会执行 block 闭包

类似的 doError,doCompleted 也是在给订阅者发送事件之前就执行相关 block

throttle:valuesPassingTest:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)testThrottleValuesPassingTest {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{

[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *throttleSignal = [sourceSignal throttle:1 valuesPassingTest:^BOOL(id _Nullable next) {
return [next integerValue] <= 2;
}];

[throttleSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 1
value = 3

throttle:valuesPassingTest: 方法有2个参数:

  • 时间间隔 interval
  • 判断条件闭包 predicate

该方法大概作用是:从原信号发出第一个信号(@0)发出开始计时,在 interval 秒内如果第二个信号(@1) 符合 predicate 的判断条件,则该前一个信号会被忽略;如果2个信号间隔时间超过 interval 秒或者不满足 predicate 判断,则前一个信号会发给订阅者。

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
NSCParameterAssert(interval >= 0);
NSCParameterAssert(predicate != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];

// Information about any currently-buffered `next` event.
__block id nextValue = nil;
__block BOOL hasNextValue = NO;
RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];

void (^flushNext)(BOOL send) = ^(BOOL send) {
@synchronized (compoundDisposable) {
[nextDisposable.disposable dispose];

if (!hasNextValue) return;
if (send) [subscriber sendNext:nextValue];

nextValue = nil;
hasNextValue = NO;
}
};

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
BOOL shouldThrottle = predicate(x);

@synchronized (compoundDisposable) {
flushNext(NO);
if (!shouldThrottle) {
[subscriber sendNext:x];
return;
}

nextValue = x;
hasNextValue = YES;
nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
flushNext(YES);
}];
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
flushNext(YES);
[subscriber sendCompleted];
}];

[compoundDisposable addDisposable:subscriptionDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}

throttle:valuesPassingTest: 内部主要通过判断 nextValuehasNextValue 2个变量的状态来判断是否给订阅者发送信号。内部先订阅原信号,然后触发订阅者的 didSubscribe,结合测试代码,具体流程:

  1. 收到原信号发出的信号 @0,传入到条件闭包 predicate,返回 YES
  2. RACCompoundDisposable 作为线程间互斥信号量,用 @synchronized 加锁保证 nextValuehasNextValue 操作是原子性
  3. 执行 flushNext(NO) ,把 nextDisposable 进行 dispose,delayScheduler 之前存放的延迟任务如果未被执行会被取消;hasNextValue == NO,直接 return,没有给订阅者发送0
  4. 判断 !shouldThrottle,跳过 if 内部代码,给 nextValue 和 hasNextValue 赋值,nextValue=@0,hasNextValue = YES
  5. 把 flushNext(YES) 加入到延迟队列中,1秒后执行
  6. 原信号发送 @1,此时时间间隔不到 1秒,从步骤3开始重复上述步骤;flushNext(NO) 中 延迟任务被取消, nextValuehasNextValue 会赋值为对应 零值。shouldThrottle 符合,nextValuehasNextValue 又被赋值到 @1 和 YES,保存新的 flushNext(YES) 到延迟任务队列中
  7. 1秒后还没有收到原信号发送的信号,执行步骤6保存的 flushNext(YES),把 @1 发给订阅者;2秒后收到原信号的信号 @2,从步骤3开始重复上述步骤,@2会被保存到 hasNextValue 中,等待下一次延迟任务可以触发的时候发给订阅者,反之被忽略
  8. 步骤7完之后马上收到新的信号@3,从步骤3开始重复上述步骤,步骤7保存的 flushNext(YES) 被取消,因为此时不符合 predicate 判断条件,@3发给订阅者

delay:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)testDelay {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal delay:1] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

NSLog(@"%@", [NSDate date]);
}

输出:

1
2
3
4
5
2019-01-31 09:34:33.286371+0800 AppTest[38331:4774309] Thu Jan 31 09:34:33 2019
2019-01-31 09:34:34.380736+0800 AppTest[38331:4774309] value = 0
2019-01-31 09:34:34.380881+0800 AppTest[38331:4774309] value = 1
2019-01-31 09:34:34.380986+0800 AppTest[38331:4774309] value = 2
2019-01-31 09:34:34.381076+0800 AppTest[38331:4774309] value = 3

delay: 方法是将原信号的信号事件延迟发送给订阅者

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- (RACSignal *)delay:(NSTimeInterval)interval {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];

void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
[disposable addDisposable:schedulerDisposable];
};

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
schedule(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
schedule(^{
[subscriber sendCompleted];
});
}];

[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}

delay: 内部首先先定义 schedule ,其参数是 dispatch_block_t,执行 schedule 的时候,会把参数 block 放到 RACScheduler 延迟 interval 秒触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

根据测试代码,最终会调用 RACQueueScheduler 中的 -after:schedule:,在该方法里面,通过 dispatch_after 来触发入参 block,触发先前会先判断该任务是否被 disposed ,如果是则直接 return。

归纳来说,delay: 方法就是将原信号的 sendNext 和 sendCompleted 事件延迟 interval 秒发送给订阅者。

bufferWithTime:onScheduler:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (void)testBufferWithTime {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
6
value = <RACTuple: 0x600000003200> (
0,
1,
2,
3
)

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
NSMutableArray *values = [NSMutableArray array];

void (^flushValues)() = ^{
@synchronized (values) {
[timerDisposable.disposable dispose];

if (values.count == 0) return;

RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
[values removeAllObjects];
[subscriber sendNext:tuple];
}
};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (values) {
if (values.count == 0) {
timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
}

[values addObject:x ?: RACTupleNil.tupleNil];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
flushValues();
[subscriber sendCompleted];
}];

return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[timerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

同样 bufferWithTime:onScheduler: 内部还会创建新的信号,当新的信号被订阅的时候会创建可变数组 values 。然后定义闭包 flushValues,当收到原信号的 sendNext 和 sendCompleted 的时候会触发该闭包。

之后对原信号进行订阅,首先收到原信号的 sendNext 事件的时候,先判断 values 的元素个数,如果个数为 0,则把 flushValues 延迟 interval 秒触发。然后把信号值加入 values 数组中,如果信号值为空,则把 RACTupleNil.tupleNil 加入数组中。

因为测试代码中,@0、@1、@2、@3 四个信号是连续串行发出,所以之前被加入延迟队列中执行的 flushValues 还没有触发的时候,当原信号发送 sendCompleted 的时候,flushValues 会被触发,这时候 values 四个元素被一次性包装成 RACTuple 发送给订阅者。

如果把测试代码修改成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (void)testBufferWithTime2 {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendCompleted];
});
});
return nil;
}];

RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
6
7
8
value = <RACTuple: 0x604000204300> (
0,
1
)
value = <RACTuple: 0x600000019300> (
2,
3
)

订阅者此时会受到2个信号,都是 RACTuple 类型,回到之前的实现代码,

  1. 收到 @0,flushValues() 加入延迟队列1秒之后执行,@0 再加入到数组 values 中
  2. 步骤1之后马上收到 @1(1秒内),重复步骤1,这时候延迟队列中有2个任务
  3. 1秒后,第一个加入延迟队列的 flushValues() 执行,首先执行了 [timerDisposable.disposable dispose]; ,这样会将延迟队列中下一个 flushValues() 任务被取消(步骤2加入的)。然后 values 元素包装成 RACTuple 发送给订阅者。
  4. 基于步骤3,再过一秒,也就是从一开始算,2秒过后。收到@2,然后重复步骤,逻辑相似
  5. 最后再过2秒,原信号发送 sendCompleted,执行 flushValues() ,values 元素个数为0,直接返回。

总结:bufferWithTime:onScheduler: 作用是将规定时间内将原信号所发送的全部信号包装成 RACTupe 发送给订阅者

timeout:onScheduler:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)testTimeout {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *timeoutSig = [sourceSignal timeout:1 onScheduler:[RACScheduler currentScheduler]];
[timeoutSig subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

[timeoutSig subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}

输出:

1
2
3
4
value = 0
value = 1
value = 2
error = Error Domain=RACSignalErrorDomain Code=1 "(null)"

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
[disposable dispose];
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
}];

[disposable addDisposable:timeoutDisposable];

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[disposable dispose];
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];

[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

timeout:onScheduler: 判断从被订阅开始计时, interval 时间内如果原信号没有把所有信号发送完毕会给订阅者发送 error。

首先创建新的信号,新信号被订阅的时候在延迟队列里面加入任务, interval 后该任务会将 NSError 发送给订阅者。如果在 interval 内原型号发送了 sendCompleted/sendError,会执行 [disposable dispose];,延迟队列中的任务会被取消。同样如果延迟任务被触发,原信号的订阅也被终止。

map:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* 代码1 */

- (void)testMap {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal map:^id _Nullable(id _Nullable value) {
// BLOCK 1
return @([value integerValue] + 1);
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 2
value = 3
value = 4

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
/* 代码2 */

- (__kindof RACStream *)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);

Class class = self.class;

return [[self flattenMap:^(id value) {
// BLOCK 2
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}

map 的参数一个参数类型为id,返回值类型也为id的block,内部实现首先通过断言判断 block 是否为空,然后调用 flattenMap 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 代码3 */

- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
Class class = self.class;

return [[self bind:^{
return ^(id value, BOOL *stop) {
// BLOCK 3

id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

flattenMap 是通过封装 bind 方法实现,bind的入参是 RACStreamBindBlock 类型闭包,flattenMap 的入参是一个入参类型为 id,返回值为 RACStream 的闭包

在 flattenMap 中,先判断 block(value) 返回的信号是否为nil,若是则返对应class 的empty 信号,也就是 RACEmptySignal 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;

dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});

return singleton;
#endif
}

从实现代码上来看,RACEmptySignal 是以单例对象的形式返回

RACEmptySignal.m 文件里还重写了 -subscribe:

1
2
3
4
5
6
7
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}

RACEmptySignal 信号被订阅之后会马上给订阅者发送 sendCompleted 事件

回到 flattenMap 的实现逻辑中,如果 block(value) 返回不是nil,它是如何返回 RACStream 对象的呢?

执行 block(value) 实际上就是触发 代码2 中的 BLOCK 2

BLOCK 2 中,又会触发另个 block(value),这时候会触发代码1中的 BLOCK 1,这里 block(value) 会返回相关对象(测试代码中返回NSNumber),BLOCK 2 中将 NSNumber 包装成 RACReturnSignal 返回,此时在 BLOCK 3 stream 对象就是 RACReturnSignal。

最后通过 bind 函数的变换,订阅会收到变换过后的值。

image-20190126201245265

mapReplace

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (void)testMapReplace {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal mapReplace:@(10)] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 10
value = 10
value = 10

底层实现:

1
2
3
4
5
- (__kindof RACStream *)mapReplace:(id)object {
return [[self map:^(id _) {
return object;
}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
}

mapReplace 是通过封装 map 方法实现,把原信号每一个事件都变换成参数 object 传递给订阅者

reduceEach

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)testReduceEach {
RACSignal *signal1 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:RACTuplePack(@1,@2)];
[subscriber sendNext:RACTuplePack(@3,@4)];
[subscriber sendNext:RACTuplePack(@5,@6)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 dispose");
}];
}];

RACSignal *signal2 = [signal1 reduceEach:^id (NSNumber *num1 , NSNumber *num2){
return @([num1 intValue] + [num2 intValue]);
}];

[signal2 subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 3
value = 7
value = 11
signal1 dispose

从输出结果看出来,reduceEach 将原信号 signal1 发送的 RACTuple 数据进行解包聚合发送给订阅者

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
/* 代码4 */

- (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock {
NSCParameterAssert(reduceBlock != nil);

__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
// BLOCK 1
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

reduceEach 也是通过封装 map 方法实现。reduceEach 方法的入参是 RACReduceBlock 类型的闭包

1
typedef id _Nonnull (^RACReduceBlock)();

首先通过断言判断 reduceBlock 闭包是否为nil,然后调用 map 方法,map 方法的入参也就是 BLOCK 1,在 BLOCK 1 里会先判断原信号 signal 发送的数据 t 是否为 RACTuple 类型,然后返回 RACBlockTrampoline 类型对象

1
2
3
@interface RACBlockTrampoline ()
@property (nonatomic, readonly, copy) id block;
@end

RACBlockTrampoline 内部会保存一个 block 对象,然后根据传进来的参数,动态的构造一个 NSInvocation,通过执行 NSInvocation 返回需要的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
- (id)invokeWithArguments:(RACTuple *)arguments {
//
SEL selector = [self selectorForArgumentCount:arguments.count];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;

for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}

[invocation invoke];

__unsafe_unretained id returnVal;
[invocation getReturnValue:&returnVal];
return returnVal;
}

- (SEL)selectorForArgumentCount:(NSUInteger)count {
NSCParameterAssert(count > 0);

switch (count) {
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
}

NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
}
1
2
3
4
5
6
- (id)performWith:(id)obj1 :(id)obj2 {
id (^block)(id, id) = self.block;
return block(obj1, obj2);
}

...

-invokeWithArguments 中首先通过判断 RACTuple 元素数量选择对应的 selector,最大能支持 15 个元素。

确定好 NSInvocation 的 target 和 seletor,还需要设置参数

1
2
3
4
5
for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}

这里看到设置 Type Encodings 的时候是从偏移量为 2开始,这是因为偏移量0和 偏移量1的参数分别对应着隐藏参数self 和 _cmd。

构造好 invocation 之后,执行 [invocation invoke] 调用动态方法,实际上就是执行代码4中的入参 reduceBlock 闭包,最后通过 [invocation getReturnValue:&returnVal] 拿到闭包的返回值,也就是 RACBlockTrampoline 的最终返回值,最终成为 map 闭包里面的返回值。剩下的就是 map 函数流程。

or

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:RACTuplePack(@NO,@YES,@NO)];
[subscriber sendNext:RACTuplePack(@YES,@NO)];

[subscriber sendCompleted];
return nil;
}];

RACSignal *orSignal = [sourceSignal or];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

输出

1
2
value = 1
value = 1
1
2
3
4
5
6
7
8
9
10
11
12
13
- (RACSignal *)or {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

return @([tuple.rac_sequence any:^(NSNumber *number) {
/// anyBlock
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

return number.boolValue;
}]);
}] setNameWithFormat:@"[%@] -or", self.name];
}

这里主要是将 RACTuple 转换成 RACTupleSequence,因为 RACTupleSequence 是继承 RACSequence,这里就会调用 RACSequence 的 - (BOOL)any:(BOOL (^)(id))block 方法

1
2
3
4
5
6
7
8
//  RACSequence.m

- (BOOL)any:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self objectPassingTest:block] != nil;
}
1
2
3
4
5
6
7
8
//  RACSequence.m

- (id)objectPassingTest:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self filter:block].head;
}

- (BOOL)any:(BOOL (^)(id))block 实现里面最终会执行 RACStream 中的 filter 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (__kindof RACStream *)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);

Class class = self.class;

return [[self flattenMap:^ id (id value) {
// flattenMapBlock

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}

在 flattenMapBlock 中会以 RACTupleSequence 发出的信号值传入到 block中,这些值信号值也就是一开始测试代码中 sourceSignal 发送给订阅者的 RACTuple 中的元素,如果value对应的BOOL值是YES,就转换成一个 RACTupleSequence 信号。如果对应的是NO,则转换成一个empty信号。

上面的 flattenMap 方法实际上最终调用到 RACSequence 的 -bind 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
block = ^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};

bindBlock = ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
*/

- (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
RACSequenceBindBlock bindBlock = block();
return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}

然后继续调用 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence 这里参数 passthroughSequence 为 nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
- (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {

__block RACSequence *valuesSeq = self;
__block RACSequence *current = passthroughSequence;
__block BOOL stop = NO;

RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
/// dependencyBlock

while (current.head == nil) {
if (stop) return nil;

// We've exhausted the current sequence, create a sequence from the
// next value.
id value = valuesSeq.head;

if (value == nil) {
// We've exhausted all the sequences.
stop = YES;
return nil;
}

current = (id)bindBlock(value, &stop);
if (current == nil) {
stop = YES;
return nil;
}

valuesSeq = valuesSeq.tail;
}

NSCAssert([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@", current);
return nil;
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;

return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];

sequence.name = self.name;
return sequence;
}

上面代码的关键逻辑是调用 RACDynamicSequence 类的 sequenceWithLazyDependency 方法,dependencyBlock 会在之前提及到的函数 -objectPassingTest 中,执行 RACSequence 的 -head 方法中触发

在 方法,dependencyBlock 中,执行current = (id)bindBlock(value, &stop); 根据value的布尔值来产生新的信号,如果为 NO 则返回 RACEmptySequence 类型,此时 current.head 为 nil,进行下一次循环;如过 value 为YES,则返回 RACUnarySequence 类型,current.head 不为 nil,结束循环。

any:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)testAny {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal any:^BOOL(id _Nullable object) {
return [object integerValue] < 2;
}] subscribeNext:^(NSNumber * _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 1
value = 0
value = 0

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);

return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.finished) {
*stop = YES;
return [RACSignal return:@NO];
}

if (predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@YES];
}

return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -any:", self.name];
}

首先原信号会 通过 -materialize 方法转换成 RACEvent 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
- (RACSignal *)materialize {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:[RACEvent eventWithValue:x]];
} error:^(NSError *error) {
[subscriber sendNext:[RACEvent eventWithError:error]];
[subscriber sendCompleted];
} completed:^{
[subscriber sendNext:RACEvent.completedEvent];
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -materialize", self.name];
}
1
2
3
- (BOOL)isFinished {
return self.eventType == RACEventTypeCompleted || self.eventType == RACEventTypeError;
}

从上面代码发现,RACEvent 如果收到原信号的 sendCompleted / sendError,finished 属性会置为 YES

-any 方法中,先判断 RACEvent 的 finished 属性,如果为 YES,stop接下来的信号 则返回 [RACSignal return:@NO];反之,则会根据入参 predicateBlock 闭包,将 RACEvent 的 value 传入 predicateBlock,如果返回值为YES,stop接下来的信号,则返回 [RACSignal return:@YES];如果 predicateBlock 返回值为 NO,则返回 [RACSignal empty]。

简单地总结来说,any 方法根据 predicateBlock 来对原信号的每一个信号进行判断,若遇到返回 YES 的条件,就给订阅者发送 YES 信号,然后发送 sendCompleted;若 predicateBlock 没有返回 YES 的条件,则最后给 订阅者 发送 NO 信号。

all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);

return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.eventType == RACEventTypeCompleted) {
*stop = YES;
return [RACSignal return:@YES];
}

if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@NO];
}

return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -all:", self.name];
}

all 方法可以理解为 any 方法的对立面,原信号如果发送 sendError 或者 predicateBlock 返回为 NO,就会结束信号的传递,并会给订阅者发值为 NO 的信号。如果整个订阅过程中都没有出现错误以及都满足 predicateBlock 为真的条件,最后会在 RACEventTypeCompleted 的时候发送 YES。

repeat

测试代码:

1
2
3
4
5
6
7
8
9
10
11
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];

[subscriber sendCompleted];
return nil;
}];

RACSignal *orSignal = [sourceSignal repeat];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

输出:

1
2
3
4
value = 1
value = 1
value = 1
...

执行之后会不断地打印 value = 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (RACSignal *)repeat {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// Resubscribe.
});
}] setNameWithFormat:@"[%@] -repeat", self.name];
}

执行 -repeat 方法之后,内部会创建新的 RASignal,当新的信号被订阅的时候会执行 subscribeForever 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
next = [next copy];
error = [error copy];
completed = [completed copy];

RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];

__weak RACDisposable *weakSelfDisposable = selfDisposable;

RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
@autoreleasepool {
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}

recurse();
} completed:^{
@autoreleasepool {
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}

recurse();
}];

[selfDisposable addDisposable:subscriptionDisposable];
};

// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^{
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];

RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
});

return compoundDisposable;
}

subscribeForever 方法有4个参数,分别是源信号 signal,next 闭包,error 闭包和 complete 闭包。函数内部先定义好递归的闭包:recursiveBlock,recursiveBlock 中 首先对 signal 进行订阅,如果源信号 signal 发送 sendError 或者 sendCompleted,就会执行对应的 error/complete 闭包,然后就执行 recursiveBlock 的参数闭包 recurse。

subscribeForever 最后是执行 recursiveBlock 并传入具体的 recurse。

recurse 中首先获取当前的递归调度器 recursiveScheduler,然后执行 -scheduleRecursiveBlock 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
}

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];

__weak RACDisposable *weakSelfDisposable = selfDisposable;

// 最终调用 schedule 方法
RACDisposable *schedulingDisposable = [self schedule:^{
/// scheduleBlock

@autoreleasepool {
// At this point, we've been invoked, so our disposable is now useless.
[disposable removeDisposable:weakSelfDisposable];
}

if (disposable.disposed) return;

void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};

// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];

__block NSUInteger rescheduleCount = 0;

// Set to YES once synchronous execution has finished. Further
// rescheduling should occur immediately (rather than being
// flattened).
__block BOOL rescheduleImmediately = NO;

@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];

if (immediate) reallyReschedule();
});
}

[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];

for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];

[selfDisposable addDisposable:schedulingDisposable];
}
}

/// RACTargetQueueScheduler.m
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

可以看到会调用到自己的 -schedule 方法,这里测试代码中是在主线程,获取到对应的是 RACTargetQueueScheduler 。这里的 -schedule 方法先判断原信号有没有disposed,若果没有,则把参数 block 放在对应的队列中触发。

可以看得到上面函数中 scheduleBlock 里不断递归执行 [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable],recursiveBlock 不断被触发,对应的 subscribeForever 中的 recurse() 循环执行:

scheduleRecursiveBlock->recursiveBlock->recurse()->reallyReschedule()->scheduleRecursiveBlock

也就是说源信号会循环被订阅触发其给订阅者发送 sendNext 事件,直到源信号发送 error 才结束。

retry:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (void)testRetry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];

[subscriber sendError:[NSError errorWithDomain:@"domain" code:-1 userInfo:nil]];

[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
6
value = 1
value = 2
value = 1
value = 2
value = 1
value = 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- (RACSignal *)retry:(NSInteger)retryCount {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block NSInteger currentRetryCount = 0;
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
// 当原始信号发送 sendError 时
if (retryCount == 0 || currentRetryCount < retryCount) {
// Resubscribe.
currentRetryCount++;
return;
}

[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// 当原信号发送 sendCompleted
[disposable dispose];
[subscriber sendCompleted];
});
}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}

-retry: 实现与 -repeat 实现类似,基于 subscribeForever-retry: 是内部维护一个 currentRetryCount 变量,当原始信号发送 sendError 时判断重试次数 currentRetryCount 是否小于 retryCount,若是则重试,如果重试依旧收到 sendError,超过 retryCount 之后就会停止重试。

如果原信号没有发生错误,那么原信号在发送结束,当原信号发送 sendCompleted,subscribeForever 也就接受了,所以 -retry: 操作对于没有任何error的信号 和 直接订阅原信号表现一样。

将测试代码改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
- (void)testRetryNoError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 1
value = 2

原信号发送 sendCompleted ,整个订阅流程就结束了。