本篇文章接着上篇继续分析常用的 RACSignal 方法的第三部分进行分析。
ignoreValues
-ignoreValues
底层比较简单,直接通过封装 filter
方法,将所有输入信号都忽略掉,所以最终的订阅者无法收到任何信号。
1 2 3 4 5
| - (RACSignal *)ignoreValues { return [[self filter:^(id _) { return NO; }] setNameWithFormat:@"[%@] -ignoreValues", self.name]; }
|
ignore:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| - (void)testIgnore { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendCompleted]; return nil; }]; RACSignal *filterSignal = [sourceSignal ignore:@1]; [filterSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
底层实现:
1 2 3 4 5
| - (__kindof RACStream *)ignore:(id)value { return [[self filter:^ BOOL (id innerValue) { return innerValue != value && ![innerValue isEqual:value]; }] setNameWithFormat:@"[%@] -ignore: %@", self.name, RACDescription(value)]; }
|
-ignore:
内部是通过封装 filter
方法实现,主要通过 filter 筛选闭包判断原信号发出的信号值和参数 value 相同,相同则被过滤掉。
take:
-take:
实现思想和 -distinctUntilChanged
相似,同样的是通过封装 -bind:
实现,在 -bind:
闭包中,用变量 taken 来记录原信号发送信号的次数,当 taken 取到 count 个数的时候,就停止给订阅者发送信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| - (instancetype)take:(NSUInteger)count { Class class = self.class; if (count == 0) return class.empty; return [[self bind:^{ __block NSUInteger taken = 0; return ^ id (id value, BOOL *stop) { if (taken < count) { ++taken; if (taken == count) *stop = YES; return [class return:value]; } else { return nil; } }; }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count]; }
|
takeLast:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| - (void)testTakeLast { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendCompleted]; return nil; }]; RACSignal *takeSignal = [sourceSignal takeLast:2]; [takeSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| - (RACSignal *)takeLast:(NSUInteger)count { return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count]; return [self subscribeNext:^(id x) { [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count) { [valuesTaken removeObjectAtIndex:0]; } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ for (id value in valuesTaken) { [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value]; }
[subscriber sendCompleted]; }]; }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count]; }
|
takeLast:
先订阅原信号,根据参数 count, 保存最后 count 个信号值到数组 valuesTaken。当原信号发送 sendCompleted的时候遍历 valuesTaken 将其元素逐个发给订阅者。
takeUntilBlock:
1 2 3 4 5 6 7 8 9 10 11 12 13
| - (__kindof RACStream *)takeUntilBlock:(BOOL (^)(id x))predicate { NSCParameterAssert(predicate != nil);
Class class = self.class; return [[self bind:^{ return ^ id (id value, BOOL *stop) { if (predicate(value)) return nil;
return [class return:value]; }; }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name]; }
|
-takeUntilBlock:
和 -taskLast:
实现思路基本一致,后置是通过信号值个数和参数 count 比较判断是否继续给订阅者发送型号,而 -takeUntilBlock:
是根据参数 predicate 闭包执行结果来判断是否给订阅者发送信号
takeUntil:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| - (void)testUntil { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ [subscriber sendNext:@3]; [subscriber sendCompleted]; }); return nil; }]; RACSignal *untilSignal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ [subscriber sendNext:@10]; [subscriber sendCompleted]; }); return nil; }]; RACSignal *takeSignal = [sourceSignal takeUntil:untilSignal]; [takeSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
1 2 3
| value = 0 value = 1 value = 2
|
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| - (RACSignal *)takeUntil:(RACSignal *)signalTrigger { return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; void (^triggerCompletion)(void) = ^{ [disposable dispose]; [subscriber sendCompleted]; };
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) { triggerCompletion(); } completed:^{ triggerCompletion(); }];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) { RACDisposable *selfDisposable = [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [disposable dispose]; [subscriber sendCompleted]; }];
[disposable addDisposable:selfDisposable]; }
return disposable; }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger]; }
|
-takeUntil:
会返回新的信号,新信号被订阅的时候触发入参闭包:
- 订阅 signalTrigger 信号,如果发送 sendNext/sendCompleted 则结束整个新信号的订阅
- 如果 signalTrigger 尚未发送过 sendNext/sendCompleted ,则将原信号的信号事件透传给订阅者
takeUntilReplacement:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| - (void)testTakeUntilReplacement { RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ [subscriber sendNext:@3]; [subscriber sendNext:@4]; [subscriber sendCompleted]; }); return nil; }]; RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ [subscriber sendNext:@5]; [subscriber sendNext:@6]; [subscriber sendCompleted]; }); return nil; }]; RACSignal *newSignal = [signal1 takeUntilReplacement:signal2]; [newSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
1 2 3 4 5
| value = 0 value = 1 value = 2 value = 5 value = 6
|
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) { [selfDisposable dispose]; [subscriber sendNext:x]; } error:^(NSError *error) { [selfDisposable dispose]; [subscriber sendError:error]; } completed:^{ [selfDisposable dispose]; [subscriber sendCompleted]; }];
if (!selfDisposable.disposed) { selfDisposable.disposable = [[self concat:[RACSignal never]] subscribe:subscriber]; }
return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [replacementDisposable dispose]; }]; }]; }
|
takeUntilReplacement:
内部主要经历以下的步骤:
首先原信号执行 -concat:
方法并传入 [RACSignal never],[RACSignal never] 返回的信号不会给订阅者发送任何东西,这样能保证收到 replacement 信号前,源信号不会被 dispose
1 2 3 4 5
| + (RACSignal *)never { return [[self createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) { return nil; }] setNameWithFormat:@"+never"]; }
|
订阅 replacement 信号,当收到其发送的 sendNext/sendError/sendCompleted 时,先讲原信号 dispose,然后将对应的信号发给最终的订阅者
主要流程如图所示:
skip:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| - (void)testSkip { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendNext:@4]; [subscriber sendCompleted]; return nil; }]; [[sourceSignal skip:1] subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
1 2 3 4
| value = 1 value = 2 value = 3 value = 4
|
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| - (__kindof RACStream *)skip:(NSUInteger)skipCount { Class class = self.class; return [[self bind:^{ __block NSUInteger skipped = 0;
return ^(id value, BOOL *stop) { if (skipped >= skipCount) return [class return:value];
skipped++; return class.empty; }; }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount]; }
|
-skip:
方法内补也是通过 -bind:
方法进行封装,通过变量 skipped 来记录原始信号已经发过几个信号,当skipped >= skipCount 的时候,会将信号事件直接发给最终订阅者。
skipUntilBlock:
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| - (void)testSkipUntilBlock { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendNext:@4]; [subscriber sendNext:@0]; [subscriber sendCompleted]; return nil; }]; [[sourceSignal skipUntilBlock:^BOOL(id _Nullable x) { return [x integerValue] > 3; }] subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| - (__kindof RACStream *)skipUntilBlock:(BOOL (^)(id x))predicate { NSCParameterAssert(predicate != nil);
Class class = self.class; return [[self bind:^{ __block BOOL skipping = YES;
return ^ id (id value, BOOL *stop) { if (skipping) { if (predicate(value)) { skipping = NO; } else { return class.empty; } }
return [class return:value]; }; }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name]; }
|
-skipUntilBlock:
和 -skip:
原理类似,忽略信号的条件从信号个数变化成从 predicate 闭包来判断,值得注意的是,当 predicate 闭包返回 YES 之后的所有信号就不再会被 skip。
skipWhileBlock:
1 2 3 4 5 6 7
| - (__kindof RACStream *)skipWhileBlock:(BOOL (^)(id x))predicate { NSCParameterAssert(predicate != nil);
return [[self skipUntilBlock:^ BOOL (id x) { return !predicate(x); }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name]; }
|
和 -skipUntilBlock:
类似,也是通过 predicate 闭包返回结果来判断是否来 skip 信号,区别是
-skipWhileBlock:
只有 predicate 返回 YES 才skip 信号,一旦返回 NO,以后的信号就不再会被 skip
distinctUntilChanged
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| - (void)testDistinctUntilChanged { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendNext:@3]; [subscriber sendCompleted]; return nil; }]; RACSignal *filterSignal = [sourceSignal distinctUntilChanged]; [filterSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
1 2 3 4
| value = 0 value = 1 value = 2 value = 3
|
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| - (__kindof RACStream *)distinctUntilChanged { Class class = self.class;
return [[self bind:^{ __block id lastValue = nil; __block BOOL initial = YES;
return ^(id x, BOOL *stop) { if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
initial = NO; lastValue = x; return [class return:x]; }; }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name]; }
|
-distinctUntilChanged
是底层是通过 -bind:
方法来实现
- 定义 lastValue 记录原信号上一次发送的信号值,initial 判断之前2个信号值是否相同
- 如果当前发送的信号值和上一次的相同,直接返回 empty 信号并发送给订阅者
- 如果当前原信号发送的信号值和上一次不相同,调用 return 方法将信号值包装成新的信号发送给订阅者
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| - (void)testGroupByTransform { RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { [subscriber sendNext:@0]; [subscriber sendNext:@1]; [subscriber sendNext:@2]; [subscriber sendNext:@3]; [subscriber sendNext:@4]; [subscriber sendCompleted]; return nil; }]; RACSignal *groupSignal = [sourceSignal groupBy:^id<NSCopying> _Nullable(id _Nullable object) { return [object integerValue] > 2 ? @"send" : @"skip"; } transform:^id _Nullable(id _Nullable object) { return @([object integerValue] * 10); }]; RACSignal *filterSignal = [[groupSignal filter:^BOOL(RACGroupedSignal *value) { return [(NSString *)value.key isEqualToString:@"send"]; }] flatten]; [filterSignal subscribeNext:^(id _Nullable x) { NSLog(@"value = %@", x); }]; }
|
输出:
底层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock { NSCParameterAssert(keyBlock != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { NSMutableDictionary *groups = [NSMutableDictionary dictionary]; NSMutableArray *orderedGroups = [NSMutableArray array];
return [self subscribeNext:^(id x) { id<NSCopying> key = keyBlock(x); RACGroupedSignal *groupSubject = nil; @synchronized(groups) { groupSubject = groups[key]; if (groupSubject == nil) { groupSubject = [RACGroupedSignal signalWithKey:key]; groups[key] = groupSubject; [orderedGroups addObject:groupSubject]; [subscriber sendNext:groupSubject]; } }
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x]; } error:^(NSError *error) { [subscriber sendError:error];
[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error]; } completed:^{ [subscriber sendCompleted];
[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)]; }]; }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name]; }
|
-groupBy:transform:
主要是信号事件转化成类似 map 结构数据发给订阅者,主要逻辑可以概括为:
创建可变字典 groups 和 可变数组 orderedGroups
订阅原信号,当原信号发送 sendNext 的时候,通过闭包 keyBlock 取到对应的 key 值,然后根据 key 从 groups 中取出对应的 groupSubject (RACGroupedSignal 类型),如果为空则新建一个,并且加入到 orderedGroups 数组中,再把 groupSubject 发给最终订阅者
groupSubject 给订阅者发送 transformBlock 闭包的返回结果,如果 transformBlock 是空,则直接把信号发给订阅者
原信号发送 sendCompleted / sendError ,遍历数组 orderedGroups 并对其中的 RACGroupedSignal 元素执行对应的 sendCompleted / sendError
经过上面的分析看得出 -groupBy:transform:
方法返回的方法也是一个高阶信号,我们可以结合之前 -flatten
、-flattenMap:
等方法结合使用,对齐进行降阶
groupBy:
-groupBy:
是通过封装 -groupBy:transform:
来实现的,把后者入参 transformBlock 赋值为 nil,也就是 -groupBy:transform:
主要逻辑步骤3中,groupSubject 会给其订阅者返回原信号发送的信号值
1 2 3
| - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock { return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name]; }
|