0%

ReactiveCocoa的冷信号与热信号

ReactiveX 中将信号分为冷信号和热信号,本文结合 ReactiveCocoa 分析其中的冷、热信号的异同。

冷、热信号的区别

ReactiveX 文档中是这样描述冷、热信号的:

When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

简单归纳两者主要区别:

  1. 冷信号的发送是被动触发的,只有被订阅之后才会发送信号;热信号的发送是主动的,不受订阅动作的时间点影响
  2. 每次订阅冷信号,订阅者都会收到完整且相同的信号序列;订阅热信号,订阅者只会收到订阅动作时候发送的信号序列

冷信号订阅示意图:

Cold

热信号订阅示意图:

Hot

RACSubject

在 ReactiveCocoa 中, RACSignal 冷信号,当订阅者对其进行订阅后都会接受到;RACSubject 代表热信号,订阅者接收到多少值取决于它订阅的时间与 RACSubject 发送信号的时机。

RACSubject 是继承于 RACSignal,同事它有3个子类分别是:

  1. RACReplaySubject
  2. RACGroupedSignal
  3. RACBehaviorSubject

RACSubject 底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

// STEP 1
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

// STEP 2
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}

// STEP 3
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];

if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];

return disposable;
}

RACSubject 重写了父类的 -subscribe: 方法:

  1. 利用 RACPassthroughSubscriber 将原始订阅者 subscriber 和自身的 disposable 关联,包装成新的 subscriber,当原始订阅者订阅取消之后会触发 disposable 的 dispose 动作
  2. 将所有 subscriber 存在可变数组 subscribers 中
  3. 创建一个 RACDisposable 对象加入到步骤1中创建的 disposable ,在当前 subscriber 销毁时,disposable 会触发 dispose 动作,然后把对应的 subscriber 从数组中移除

RACSubject 能够发送信号是因为其实现了 RACSubscriber 协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}

- (void)sendError:(NSError *)error {
[self.disposable dispose];

[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendError:error];
}];
}

- (void)sendCompleted {
[self.disposable dispose];

[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendCompleted];
}];
}

sendNext/sendError/sendCompleted 都会对数组 subscribers 进行遍历,然后将信号事件透传给订阅者

1
2
3
4
5
6
7
8
9
10
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}

for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}

-enumerateSubscribersUsingBlock: 中是通过 @synchronized 包装数据的线程安全

RACBehaviorSubject

RACBehaviorSubject 每次被订阅的时候会向订阅者发送最新的信号

初始化的时候,可以设置第一个最新信号

1
2
3
4
5
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value {
RACBehaviorSubject *subject = [self subject];
subject.currentValue = value;
return subject;
}

currentValue 属性保存着最新的信号值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];

RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
[subscriber sendNext:self.currentValue];
}
}];

return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}

可以看到 RACBehaviorSubject 被订阅的时候,先执行了父类的方法,然后给订阅者发送 currentValue

RACBehaviorSubject

RACReplaySubject

RACReplaySubject 可以看做是 RACBehaviorSubject 的升级版,不过细节上略有差别

1
2
3
4
5
6
7
8
9
10
11
@interface RACReplaySubject ()

@property (nonatomic, assign, readonly) NSUInteger capacity;

// These properties should only be modified while synchronized on self.
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@property (nonatomic, assign) BOOL hasCompleted;
@property (nonatomic, assign) BOOL hasError;
@property (nonatomic, strong) NSError *error;

@end
  • capacity: 确定保存最近发送过的信号值个数
  • valuesReceived: 保存最近发送过的信号值
  • hasCompleted: 是否发送过 sendCompleted
  • hasError: 是否发送过 sendError
  • error: 保存之前发送的 NSError
1
2
3
4
5
6
7
8
9
10
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];

if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}

发送信号值,先用 @synchronized 进行加锁,然后把信号值 value 保存在 valuesReceived 数组中,如果 value == nil,则用 RACTupleNil.tupleNil 替代并加入到数组中。

当 capacity 不等于 RACReplaySubjectUnlimitedCapacity,数组长度大于 capacity 的时候,会弹出数组中前几个元素,类似栈结构的 pop 动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;

[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}

if (compoundDisposable.disposed) return;

if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];

[compoundDisposable addDisposable:schedulingDisposable];

return compoundDisposable;
}

订阅的逻辑大体上和 RACBehaviorSubject 相似,每次订阅的时候,同样通过 @synchronized 加锁,然后将数组的所有元素逐一发送给订阅者,如果之前发送过 sendCompleted/sendError,则给订阅者发送对应的sendCompleted/sendError

需要注意的是,不指定 RACReplaySubject 的 capacity,订阅者订阅之后都会获取到完整的信号序列(类似订阅冷信号)

RACReplaySubject

RACGroupedSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
@interface RACGroupedSignal ()
@property (nonatomic, copy) id<NSCopying> key;
@end

@implementation RACGroupedSignal

#pragma mark API

+ (instancetype)signalWithKey:(id<NSCopying>)key {
RACGroupedSignal *subject = [self subject];
subject.key = key;
return subject;
}

RACGroupedSignal 相对于 RACSubject 对存储了一个 key 值

在 RACSignal 的 groupBy:transform: 方法中使用了 RACGroupedSignal,具体细节参考文章

冷信号转换成热信号

冷信号和热信号本质区别在于是否保持状态,冷信号本身不保持多次订阅发送信号过程的状态,所以每次订阅冷信号就会收到完整的信号序列;相反热信号维持多次订阅的状态,订阅者订阅热信号只会收到订阅动作之后发送的信号值。

基于上面的区别,很容易会想到一种方法将 ReactiveCocoa 中的冷信号(RACSignal)转化成热信号:订阅冷信号,将收到的信号值通过 RACSubject 转发,订阅者通过订阅 RACSubject 来获取热信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
}];

return nil;
}];

RACSubject *subject = [RACSubject subject];
[coldSignal subscribe:subject];

[subject subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

输出:

1
2
value = 1
value = 2

这个方法存在一个问题:如果 subject 的订阅者提前终止了订阅,而 subject 并不能终止对 coldSignal 的订阅。

ReactiveCocoa 框架中提供了一下几个方法将冷信号转化成热信号:

  1. - (RACMulticastConnection<ValueType> *)multicast:(RACSubject<ValueType> *)subject
  2. - (RACMulticastConnection *)publish
  3. - (RACSignal<ValueType> *)replay
  4. - (RACSignal<ValueType> *)replayLast
  5. - (RACSignal<ValueType> *)replayLazily

其中 - (RACMulticastConnection<ValueType> *)multicast:(RACSubject<ValueType> *)subject 是另外几个方法的实现基础,所以先分析次方法的实现

multicast:

1
2
3
4
5
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}

-multicast: 是通过初始化一个 RACMulticastConnection 对象实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/// RACMulticastConnection.m

- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);

self = [super init];

_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;

return self;
}

#pragma mark Connecting

- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}

return self.serialDisposable;
}

- (RACSignal *)autoconnect {
__block volatile int32_t subscriberCount = 0;

return [[RACSignal
createSignal:^(id<RACSubscriber> subscriber) {
OSAtomicIncrement32Barrier(&subscriberCount);

RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
RACDisposable *connectionDisposable = [self connect];

return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];

if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
[connectionDisposable dispose];
}
}];
}]
setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}
  1. RACMulticastConnection 初始化的时候将原信号(冷信号)保存在为属性 sourceSignal,signal 属性保存着传入的 RACSubject。serialDisposable 用于需要订阅的清理操作
  2. RACMulticastConnection 提供2个方法进行信号转换,首先是 -connect,改方法中先通过实例变量 hasConnected 判断是否已经执行过 -connect,值得注意的是这里是通过原子性运算 OSAtomicCompareAndSwap32Barrier 来进行判断,如果 hasConnected == 0,则返回 YES,然后将 hasConnected 赋值为 1。如果之前没有执行过,就以 _signal(RACSubject) 作为订阅者对原信号进行订阅
  3. connect 之后 RACMulticastConnection 对象的 _signal 也就是一个热信号,对外部来说 signal 是一个只读属性
  4. 如果要确保热信号 signal 的第一次订阅能收到 sourceSignal 的完整信号序列,可以调用另一个转换方法 -autoConnect。改函数中通过创建返回一个新的 RACSignal,当其被订阅首先会先订阅 signal 属性,然后再执行 -connect,因此第一次对 signal 进行订阅的时候,会收到 sourceSignal 的所有信号值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- (void)testConnection {
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];

[[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
[subscriber sendNext:@2];
}];

[[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];

RACSubject *subject = [RACSubject subject];
RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
RACSignal *hotSignal = multicastConnection.signal;

[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 1 recieve value:%@.", x);
}];

[multicastConnection connect];

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 2 recieve value:%@.", x);
}];
}];
}

输出:

1
2
3
Subscribe 1 recieve value:1.
Subscribe 1 recieve value:2.
Subscribe 2 recieve value:2.

通过输出结果可以看得出来,multicastConnection.signal 是一个热信号,订阅者只收到订阅动作之后发送的信号值。

同样可以通过 -autoConnect 实现一样的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
- (void)testAutoConnection {
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];

[[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
[subscriber sendNext:@2];
}];

[[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];

RACSubject *subject = [RACSubject subject];
RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
RACSignal *hotSignal = multicastConnection.autoconnect;

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 1 recieve value:%@.", x);
}];
}];

[[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
[hotSignal subscribeNext:^(id x) {
NSLog(@"Subscribe 2 recieve value:%@.", x);
}];
}];
}

publish

1
2
3
4
5
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}

-publish 本质上就是调用了 -multicast 并返回了对应的 RACMulticastConnection 对象

replay

1
2
3
4
5
6
7
8
- (RACSignal *)replay {
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];

RACMulticastConnection *connection = [self multicast:subject];
[connection connect];

return connection.signal;
}

-replay 是用 RACReplaySubject 替代 RACSubject,并且还执行了 -connect 方法,订阅者订阅热信号之后会收到最后发送的信号值序列

replayLast

1
2
3
4
5
6
7
8
- (RACSignal *)replayLast {
RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];

RACMulticastConnection *connection = [self multicast:subject];
[connection connect];

return connection.signal;
}

-replayLast-replay 基础上用把 RACReplaySubject 的 capacity 赋值为1,订阅者订阅热信号之后之前发送过最新的一个信号值

replayLazily

1
2
3
4
5
6
7
8
9
- (RACSignal *)replayLazily {
RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
return [[RACSignal
defer:^{
[connection connect];
return connection.signal;
}]
setNameWithFormat:@"[%@] -replayLazily", self.name];
}

-replayLazily 返回的信号只有被订阅的时候才会进行 connect 操作,也就是这时候才去订阅 sourceSignal,效果跟直接订阅冷信号相似

为什么需要热信号

先来看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
- (void)testSideEffect {
AFHTTPSessionManager *mgr = [AFHTTPSessionManager manager];
mgr.requestSerializer = [AFJSONRequestSerializer serializer];
mgr.responseSerializer = [AFJSONResponseSerializer serializer];

NSString *url = @"https://apis.map.qq.com/ws/place/v1/search";
NSDictionary *params = @{
@"boundary":@"nearby(22.54046907,113.93455082,1000)",
@"key":@"XXXXX-XXXXX-XXXXX-XXXXX-XXXXX-XXXXX",
@"keyword":@"深大",
@"page_index":@1,
@"page_size":@10,
};

RACSignal *fetchWebservice = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
NSLog(@"fetch begin");
NSURLSessionDataTask *task = [mgr GET:url parameters:params progress:nil success:^(NSURLSessionDataTask * _Nonnull task, id _Nullable responseObject) {
NSLog(@"fetch success");
[subscriber sendNext:responseObject];
[subscriber sendCompleted];
} failure:^(NSURLSessionDataTask * _Nullable task, NSError * _Nonnull error) {
NSLog(@"fetch error");
[subscriber sendError:error];
}];
return [RACDisposable disposableWithBlock:^{
if (task.state != NSURLSessionTaskStateCompleted) {
[task cancel];
}
}];
}];

RACSignal *requestId = [fetchWebservice flattenMap:^__kindof RACSignal * _Nullable(NSDictionary *responseObject) {
id requestId = responseObject[@"request_id"];
if ([requestId isKindOfClass:NSString.class]) {
return [RACSignal return:requestId];
} else {
return [RACSignal error:[NSError errorWithDomain:@"error" code:400 userInfo:@{@"responseObject": responseObject}]];
}
}];

RACSignal *count = [fetchWebservice flattenMap:^__kindof RACSignal * _Nullable(NSDictionary *responseObject) {
id count = responseObject[@"count"];
if ([count isKindOfClass:NSNumber.class]) {
return [RACSignal return:count];
} else {
return [RACSignal error:[NSError errorWithDomain:@"error" code:400 userInfo:@{@"responseObject": responseObject}]];
}
}];

RACSignal *requestIdCatchSig = [[requestId catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
[requestIdCatchSig setName:@"requestIdCatchSig"];
RAC(self, requestId) = requestIdCatchSig;

RACSignal *countSig = [[count catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
[countSig setName:@"countCatchSig"];
RAC(self, countStr) = countSig;

[[RACSignal merge:@[requestId, count]] subscribeError:^(NSError *error) {
UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
[alertView show];
}];
}

我们经常会遇到请求数据,然后再把数据处理解析返回需要的具体返回,但是以上代码会触发4次的网络请求,在打印的日志可以看到

1
2
3
4
5
fetch begin
fetch begin
fetch begin
fetch begin
...

之前分析 RACSignal 常用方法的时候,发现 ReactiveCocoa 中信号常用的方法是先订阅原信号,然后返回新的 RACSignal,而 RACSignal 被订阅的时候会马上执行一次初始化保存的 didSubscribe 闭包。

首先这里用宏把信号和属性进行绑定 RAC(self, requestId) = requestIdCatchSig,宏 RAC 内部会初始化 RACSubscriptingAssignmentTrampoline

1
2
3
4
5
6
7
#define RAC(TARGET, ...) \
metamacro_if_eq(1, metamacro_argcount(__VA_ARGS__)) \
(RAC_(TARGET, __VA_ARGS__, nil)) \
(RAC_(TARGET, __VA_ARGS__))

#define RAC_(TARGET, KEYPATH, NILVALUE) \
[[RACSubscriptingAssignmentTrampoline alloc] initWithTarget:(TARGET) nilValue:(NILVALUE)][@keypath(TARGET, KEYPATH)]

内部会对 requestIdCatchSig 进行订阅,requestIdCatchSig 被订阅的时候 requestId 也会被订阅一次,requestId 是 fetchWebservice 通过 -flatten: 方法转换,最后 fetchWebservice 也因此被订阅,触发了一次网络请求

同理 RAC(self, countStr) = countSig; 也会触发一次网络请求,到此会引起2次网络请求。

很容易看出剩余的2次是因为 +merge: 方法导致,在改方法中内部会将传入的 requestId,count 都会进行一次 flattenMap 操作,导致 fetchWebservice 被订阅2次。

可以看出,在 ReactiveCocoa 中信号转换即是对原有的信号进行订阅从而产生新的信号,而冷信号 RACSignal 每次被订阅都会执行一次订阅时间,产生了副作用

改进:

1
2
3
RACMulticastConnection *connection = [fetchWebservice publish];
RACSignal *hotFetchWebservice = connection.signal;
[connection connect];

这时候我们利用热信号的特性,将 fetchWebservice 转化成功对应的热信号 hotFetchWebservice,后面的处理都以 hotFetchWebservice 原信号进行转换就能避免冷信号转化过程中产生的副作用。

参考文章:

  1. ReactiveX文档
  2. 『可变』的热信号 RACSubject
  3. 细说ReactiveCocoa的冷信号与热信号(三):怎么处理冷信号与热信号