RACSignal常用方法深入分析(3)

本篇文章接着上篇继续分析常用的 RACSignal 方法的第三部分进行分析。

ignoreValues

-ignoreValues 底层比较简单,直接通过封装 filter 方法,将所有输入信号都忽略掉,所以最终的订阅者无法收到任何信号。

1
2
3
4
5
- (RACSignal *)ignoreValues {
return [[self filter:^(id _) {
return NO;
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}

ignore:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (void)testIgnore {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *filterSignal = [sourceSignal ignore:@1];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 0
value = 2

底层实现:

1
2
3
4
5
- (__kindof RACStream *)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)];
}

-ignore: 内部是通过封装 filter 方法实现,主要通过 filter 筛选闭包判断原信号发出的信号值和参数 value 相同,相同则被过滤掉。

take:

-take: 实现思想和 -distinctUntilChanged 相似,同样的是通过封装 -bind: 实现,在 -bind: 闭包中,用变量 taken 来记录原信号发送信号的次数,当 taken 取到 count 个数的时候,就停止给订阅者发送信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- (instancetype)take:(NSUInteger)count {
Class class = self.class;

if (count == 0) return class.empty;

return [[self bind:^{
__block NSUInteger taken = 0;

return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

takeLast:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (void)testTakeLast {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *takeSignal = [sourceSignal takeLast:2];

[takeSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 2
value = 3

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- (RACSignal *)takeLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];

while (valuesTaken.count > count) {
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
for (id value in valuesTaken) {
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
}

[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
}

takeLast: 先订阅原信号,根据参数 count, 保存最后 count 个信号值到数组 valuesTaken。当原信号发送 sendCompleted的时候遍历 valuesTaken 将其元素逐个发给订阅者。

takeUntilBlock:

1
2
3
4
5
6
7
8
9
10
11
12
13
- (__kindof RACStream *)takeUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

Class class = self.class;

return [[self bind:^{
return ^ id (id value, BOOL *stop) {
if (predicate(value)) return nil;

return [class return:value];
};
}] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}

-takeUntilBlock:-taskLast: 实现思路基本一致,后置是通过信号值个数和参数 count 比较判断是否继续给订阅者发送型号,而 -takeUntilBlock: 是根据参数 predicate 闭包执行结果来判断是否给订阅者发送信号

takeUntil:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
- (void)testUntil {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *untilSignal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@10];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *takeSignal = [sourceSignal takeUntil:untilSignal];

[takeSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
value = 0
value = 1
value = 2

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};

RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];

[disposable addDisposable:triggerDisposable];

if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];

[disposable addDisposable:selfDisposable];
}

return disposable;
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}

-takeUntil: 会返回新的信号,新信号被订阅的时候触发入参闭包:

  1. 订阅 signalTrigger 信号,如果发送 sendNext/sendCompleted 则结束整个新信号的订阅
  2. 如果 signalTrigger 尚未发送过 sendNext/sendCompleted ,则将原信号的信号事件透传给订阅者

image-takeuntil

takeUntilReplacement:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (void)testTakeUntilReplacement {
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];

dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@5];
[subscriber sendNext:@6];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *newSignal = [signal1 takeUntilReplacement:signal2];
[newSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
5
value = 0
value = 1
value = 2
value = 5
value = 6

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];

RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
[selfDisposable dispose];
[subscriber sendNext:x];
} error:^(NSError *error) {
[selfDisposable dispose];
[subscriber sendError:error];
} completed:^{
[selfDisposable dispose];
[subscriber sendCompleted];
}];

if (!selfDisposable.disposed) {
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
}

return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[replacementDisposable dispose];
}];
}];
}

takeUntilReplacement: 内部主要经历以下的步骤:

  1. 首先原信号执行 -concat: 方法并传入 [RACSignal never],[RACSignal never] 返回的信号不会给订阅者发送任何东西,这样能保证收到 replacement 信号前,源信号不会被 dispose

    1
    2
    3
    4
    5
    + (RACSignal *)never {
    return [[self createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
    return nil;
    }] setNameWithFormat:@"+never"];
    }
  2. 订阅 replacement 信号,当收到其发送的 sendNext/sendError/sendCompleted 时,先讲原信号 dispose,然后将对应的信号发给最终的订阅者

主要流程如图所示:

image-20190223154342910

skip:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- (void)testSkip {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal skip:1] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 1
value = 2
value = 3
value = 4

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (__kindof RACStream *)skip:(NSUInteger)skipCount {
Class class = self.class;

return [[self bind:^{
__block NSUInteger skipped = 0;

return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];

skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

-skip: 方法内补也是通过 -bind: 方法进行封装,通过变量 skipped 来记录原始信号已经发过几个信号,当skipped >= skipCount 的时候,会将信号事件直接发给最终订阅者。

image-20190223161533779

skipUntilBlock:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (void)testSkipUntilBlock {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal skipUntilBlock:^BOOL(id _Nullable x) {
return [x integerValue] > 3;
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 4
value = 0

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (__kindof RACStream *)skipUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

Class class = self.class;

return [[self bind:^{
__block BOOL skipping = YES;

return ^ id (id value, BOOL *stop) {
if (skipping) {
if (predicate(value)) {
skipping = NO;
} else {
return class.empty;
}
}

return [class return:value];
};
}] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}

-skipUntilBlock:-skip: 原理类似,忽略信号的条件从信号个数变化成从 predicate 闭包来判断,值得注意的是,当 predicate 闭包返回 YES 之后的所有信号就不再会被 skip。

skipWhileBlock:

1
2
3
4
5
6
7
- (__kindof RACStream *)skipWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);

return [[self skipUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}

-skipUntilBlock: 类似,也是通过 predicate 闭包返回结果来判断是否来 skip 信号,区别是

-skipWhileBlock: 只有 predicate 返回 YES 才skip 信号,一旦返回 NO,以后的信号就不再会被 skip

###

distinctUntilChanged

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (void)testDistinctUntilChanged {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *filterSignal = [sourceSignal distinctUntilChanged];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
3
4
value = 0
value = 1
value = 2
value = 3

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- (__kindof RACStream *)distinctUntilChanged {
Class class = self.class;

return [[self bind:^{
__block id lastValue = nil;
__block BOOL initial = YES;

return ^(id x, BOOL *stop) {
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];

initial = NO;
lastValue = x;
return [class return:x];
};
}] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}

-distinctUntilChanged 是底层是通过 -bind: 方法来实现

  1. 定义 lastValue 记录原信号上一次发送的信号值,initial 判断之前2个信号值是否相同
  2. 如果当前发送的信号值和上一次的相同,直接返回 empty 信号并发送给订阅者
  3. 如果当前原信号发送的信号值和上一次不相同,调用 return 方法将信号值包装成新的信号发送给订阅者

groupBy:transform:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- (void)testGroupByTransform {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];

RACSignal *groupSignal = [sourceSignal groupBy:^id<NSCopying> _Nullable(id _Nullable object) {
return [object integerValue] > 2 ? @"send" : @"skip";
} transform:^id _Nullable(id _Nullable object) {
return @([object integerValue] * 10);
}];

RACSignal *filterSignal = [[groupSignal filter:^BOOL(RACGroupedSignal *value) {
return [(NSString *)value.key isEqualToString:@"send"];
}] flatten];

[filterSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}

输出:

1
2
value = 30
value = 40

底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
NSCParameterAssert(keyBlock != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
NSMutableArray *orderedGroups = [NSMutableArray array];

return [self subscribeNext:^(id x) {
id<NSCopying> key = keyBlock(x);
RACGroupedSignal *groupSubject = nil;
@synchronized(groups) {
groupSubject = groups[key];
if (groupSubject == nil) {
groupSubject = [RACGroupedSignal signalWithKey:key];
groups[key] = groupSubject;
[orderedGroups addObject:groupSubject];
[subscriber sendNext:groupSubject];
}
}

[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
} error:^(NSError *error) {
[subscriber sendError:error];

[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
} completed:^{
[subscriber sendCompleted];

[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
}];
}] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}

-groupBy:transform: 主要是信号事件转化成类似 map 结构数据发给订阅者,主要逻辑可以概括为:

  1. 创建可变字典 groups 和 可变数组 orderedGroups

  2. 订阅原信号,当原信号发送 sendNext 的时候,通过闭包 keyBlock 取到对应的 key 值,然后根据 key 从 groups 中取出对应的 groupSubject (RACGroupedSignal 类型),如果为空则新建一个,并且加入到 orderedGroups 数组中,再把 groupSubject 发给最终订阅者

  3. groupSubject 给订阅者发送 transformBlock 闭包的返回结果,如果 transformBlock 是空,则直接把信号发给订阅者

  4. 原信号发送 sendCompleted / sendError ,遍历数组 orderedGroups 并对其中的 RACGroupedSignal 元素执行对应的 sendCompleted / sendError

    经过上面的分析看得出 -groupBy:transform: 方法返回的方法也是一个高阶信号,我们可以结合之前 -flatten-flattenMap: 等方法结合使用,对齐进行降阶

groupBy:

-groupBy: 是通过封装 -groupBy:transform: 来实现的,把后者入参 transformBlock 赋值为 nil,也就是 -groupBy:transform: 主要逻辑步骤3中,groupSubject 会给其订阅者返回原信号发送的信号值

1
2
3
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
}