RACSignal源码解析

RACStream 是 ReactiveCocoa 中的核心概念:信号;RACStream 中有2个子类:

  1. RACSignal
  2. RACSequence

RACSignal

实际项目中,对 RACSignal 的使用中,经常会看到这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* 代码 1 */

RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber){
/// STEP 1
[subscriber sendNext:@(1)];
[subscriber sendNext:@(2)];
[subscriber sendNext:@(3)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
/// STEP 5
NSLog(@"signal dispose");
}];
}];

RACDisposable *disposable = [signal subscribeNext:^(id x) {
/// STEP 2
NSLog(@"received value = %@", x);
} error:^(NSError *error) {
/// STEP 3
NSLog(@"received error: %@", error);
} completed:^{
/// STEP 4
NSLog(@"received completed");
}];

[disposable dispose];

基于以上的代码,我们可以看看 RACSigal从 被订阅到订阅者受到数据的整个过程,具体经历过那些流程

初始化:

1
2
3
4
5
/* 代码 2 */

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe]; // 返回 RACDynamicSignal 类型的对象
}

上面初始化返回 RACDynamicSignal 对象,这是一个私有类,继承RACSignal,实现 RACSignal 的订阅行为

1
2
3
4
5
6
7
8
9
/* 代码 3 */

// A private `RACSignal` subclasses that implements its subscription behavior
// using a block.
@interface RACDynamicSignal : RACSignal

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe;

@end

在调用 RACDynamicSignal 的 createSignal 方法中,会传入一个名为 didSubscribe ,返回类型为 RACDisposable 的block,在内部会将 didSubscribe 进行一次copy,然后保存在 RACDynamicSignal 对象中。

1
2
3
4
5
6
7
/* 代码 4 */

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy]; // 这里先进行一次copy
return [signal setNameWithFormat:@"+createSignal:"];
}

上面代码初始化返回 signal 调用方法 setNameWithFormat,会执行基类 RACStream 的方法,给 name 属性赋值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 代码 5 */

- (instancetype)setNameWithFormat:(NSString *)format, ... {
if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;

NSCParameterAssert(format != nil);

va_list args;
va_start(args, format);

NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
va_end(args);

self.name = str;
return self;
}

从 didSubscribe 中还可以看到 RACSubscriber 协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* 代码 6 */

@protocol RACSubscriber <NSObject>
@required

/// 给所有订阅者发送 next value
- (void)sendNext:(nullable id)value;

/// 给所有订阅者发送 error
/// 这将终止订阅,并使所有订阅者无法收到后续数据
- (void)sendError:(nullable NSError *)error;

/// 给所有订阅者发送 complete
/// 这将终止订阅,并使所有订阅者无法收到后续数据
- (void)sendCompleted;

/// 用来接收代表某次订阅的 disposable 对象, 用来处理是否释放取消订阅的事件
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

代码4中可以看到,创建 RACSignal 的时候,signal 会持有保存传入的 didSubscribe 闭包。这个闭包会在 signal 被订阅的时候触发

1
2
3
4
5
6
7
8
9
10
/* 代码 7 */

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);

RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}

我们平时对 RACSignal 实例对象调用 - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock 或者 - (RACDisposable *)subscribeError:(void (^)(NSError *error))errorBlock 方法会返回 RACDisposable 对象,内部会创建一个实现了 RACSubscriber 协议的对象 o, 把 next、error、completed 三种block进行 copy 保存

1
2
3
4
5
6
7
8
9
10
11
/* 代码 8 */

+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];

subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];

return subscriber;
}

然后调用 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber 方法并把刚刚 RACSubscriber 对象传入,如代码2所示,这里实际上是调用了 RACDynamicSignal 的 subscriber 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* 代码 9 */

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

return disposable;
}

在代码9中我们可以看到 2 个 类:RACCompoundDisposable、 RACPassthroughSubscriber

RACCompoundDisposable 继承 RACDisposable,可以理解为是一个保存 RACDisposable 类型的容器,当 RACCompoundDisposable 执行 - (void)dispose 方法会将 disposables 数组中的元素一一进行 dispose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* 代码 10 */

static void disposeEach(const void *value, void *context) {
RACDisposable *disposable = (__bridge id)value;
[disposable dispose];
}

- (void)dispose {
#if RACCompoundDisposableInlineCount
RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
#endif

CFArrayRef remainingDisposables = NULL;

pthread_mutex_lock(&_mutex);
{
_disposed = YES;

#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
inlineCopy[i] = _inlineDisposables[i];
_inlineDisposables[i] = nil;
}
#endif

remainingDisposables = _disposables;
_disposables = NULL;
}
pthread_mutex_unlock(&_mutex);

#if RACCompoundDisposableInlineCount
// Dispose outside of the lock in case the compound disposable is used
// recursively.
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
[inlineCopy[i] dispose];
}
#endif

if (remainingDisposables == NULL) return;

/// 遍历 RACDisposable 数组
/// 通过函数 disposeEach 来 dispose 每一个数组元素
CFIndex count = CFArrayGetCount(remainingDisposables);
CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
CFRelease(remainingDisposables);
}

RACPassthroughSubscriber 是一个私有类,主要的作用是把一个订阅者 subscriber A 的信号事件传递给另一个没有被 dispose 的订阅者 subscriber B。由以下几个步骤实现:

  1. 包装真正的订阅者,使自己成为订阅者的替代者
  2. 将真正的订阅者与一个订阅时产生的 Disposable 关联起来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* 代码 11 */

@interface RACPassthroughSubscriber ()

// 接受转发信号事件的订阅者,也就是上面提到的 subscriber B
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;

// 给 RACPassthroughSubscriber 发送事件的 RACSignal 信号
//
// 该属性使用 unsafe_unretained 修饰主要是因为 RACSignal 仅是一个 DTrace probes
// 动态跟踪技术的探针, 如果改用 weak 会造成不必要的性能损耗
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;

// disposable 若disposed,信号事件则不再转发给 innerSubscriber
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

回到代码9中,之前已经创建刚刚提到的 RACCompoundDisposable 和 RACPassthroughSubscriber,

1
2
3
4
5
6
7
8
9
10
/* 代码 12 */

if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

subscriptionScheduler 是一个单例方法,返回 RACSubscriptionScheduler 对象

1
2
3
4
5
6
7
8
9
10
11
/* 代码 13 */

+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});

return subscriptionScheduler;
}

继而调用 RACSubscriptionScheduler 的 schedule 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 代码 14 */

- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];

block();
return nil;
}

+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

return nil;
}

schedule 方法中主要先去找当前线程对应的 RACScheduler 对象,如果找不到,则去找主线程对应的 RACScheduler 对象,如果还是找不到,则返回 backgroundScheduler 对象。

schedule 方法目的是主要释放参数 block,也就是执行代码12中

1
2
3
4
/* 代码 15 */

RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];

代码 4 中保存的闭包 didSubscribe 会在这里执行,进而可能调用闭包入参 subscriber(也就是之前创建的RACPassthroughSubscriber对象) 的 sendNext,sendError,sendCompleted 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* 代码 16 */

- (void)sendNext:(id)value {
if (self.disposable.disposed) return;

if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}

[self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;

if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}

[self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
if (self.disposable.disposed) return;

if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}

[self.innerSubscriber sendCompleted];
}

RACPassthroughSubscriber 在以上的方法会将信号事件转发给 innerSubscriber,因为 innerSubscriber 是 RACSubscriber 对象,进而会执行私有类 RACSubscriber 中的方法 sendNext/sendError/sendCompleted

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* 代码 17 */

- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;

nextBlock(value);
}
}

- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];

if (errorBlock == nil) return;
errorBlock(e);
}
}

- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];

if (completedBlock == nil) return;
completedBlock();
}
}

执行上面方法前,都会先加锁保证线程安全,然后将对应的block(代码 8 创建 RACSubscriber 保存的 _next/_error/_completed)进行一次 copy,最后执行 block,最终的结果就是执行 代码 1 中的 STEP 2STEP 3STEP 4

基于以上的分析,RACSignal 从创建到给订阅发送事件可以归纳为以下几个步骤:

  1. 调用RACSignal createSignal 的方法,返回子类 RACDynamicSignal 对象并保存闭包 didSubscribe – 代码 1
  2. 订阅信号,调用RACSignal subscribeNext 方法,在该方法中会创建 RACSubscriber 订阅者对象
  3. 创建的 RACSubscriber 订阅者对象会copy nextBlock,errorBlock,completedBlock,保存到相关属性中 – 代码 7
  4. 调用步骤1创建的 RACDynamicSignal subscribe 方法
  5. 创建 RACCompoundDisposable 和 RACPassthroughSubscriber 对象,RACPassthroughSubscriber 会保存 步骤1返回的 RACDynamicSignal(signal属性),步骤2中的 RACSubscriber(innerSubscriber属性)– 代码 11
  6. 执行 RACDynamicSignal 保存的 didSubscribe 闭包,闭包内调用 RACPassthroughSubscriber sendNextsendErrorsendCompleted 方法
  7. RACPassthroughSubscriber 执行保存的 innerSubscriber 对应的 sendNextsendErrorsendCompleted 方法 — 代码 17