彻底搞懂 RxJava — 高级篇
前言
本文献给所有想要深入了解 RxJava 的人.
如果你还没阅读过基础部分, 请先补习一下功课. 此外, 强烈建议你彻底理解中级部分再来阅读本文.
本文将带你理解 `.subscribeOn(Schedulers.io)` 和 `.observeOn(Schedulers.computation)` 用法背后的原理.
先看一下基本使用
我们来拆解一下 `.subscribeOn` 和 `.observeOn` 的作用范围:
- `subscribeOn` 将作用于 `create` 中的 `OnSubscribe.call` 方法.
- `observeOn` 作用于其语法中下一语句的 `Subscriber.onNext` 等函数中.
源码分析
首先分析 `subscribeOn`
与中级篇中的 `map` 一样, 是通过创建了一个 `Observable` 来转发 `OnSubscribe.call` 请求(代码中的 `OperatorSubscribeOn` 继承自 `OnSubscribe`). 来看看具体实现(已略去无关代码):
public void call(final Subscriber super T> subscriber) { final Worker inner = scheduler.createWorker; inner.schedule(new Action0 { @Override public void call { source.unsafeSubscribe(s); } }); }
可见, 该函数中做了如下两件事:
- 创建一个用于在不同线程执行的 `Worker` 对象(代码中的 inner)
- 使用上述 `inner` 在该对象所代表的线程中执行 `Observable.onSubscribe.call` 方法(代码中的 `source.unsafeSubscribe(s);`
梳理一下 `subscribeOn` 的实现方法
再来分析稍微复杂一点儿的 `observeOn`
回顾一下 `lift`
public final
`lift` 创建创建了一个 `Observable` 和 一个 `OnSubscriber` 对象. 而在 `OnSubscribe` 对象中又创建了一个 `Subscriber` (代码中的 `st`) 对象. 整个 `observeOn` 的重点也就在这个 `st` 对象中. 我们先来看一下 `st` 是如何生成的:
`hook.onLift(operator).call(o);` 实际上调用的就是 `operator.call(o)`. 而 `operator` 是 `OperatorObserveOn` 的实例. 所以看下 `OperatorObserveOn.call` 方法是如何生成 `st` 对象的(已略去无关代码):
public Subscriber super T> call(Subscriber super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber
该方法会根据 `scheduler` 的类型决定使用什么方式返回 `Subscriber` 对象. 可见, 如果 child 类型为 `ImmediateScheduler` 或者 `TrampolineScheduler` 等以当前线程为执行环境的类型, 则直接返回 `child` 对象. 本例中, `child` 为 `NewThreadScheduler`, 因此将通过 `ObserveOnSubscriber` 对 `child` 进行包装. 生成一个 proxy subscriber 对象.
至此, 我们可以知道 `observeOn` 是通过以下方法对其后面的 `Subscriber` 进行控制的:
- `lift` -> `OnSubscribe.call` -> `proxy subscriber = new Subscriber(original subscriber)` 创建了一个新的 `Subscriber`(实际上是个代理)
- 在上述 `proxy subscriber` 中对 `original subscriber` 对象的执行进行转发. 转发过程中, `proxy subscriber` 完全可以自由的控制 `original subscriber` 执行的线程.
整体梳理一下 `subscribeOn` + `observeOn` 的实现方法
Refs:
Grokking 带你入门
Bruce 大头鬼
扔物线大神
读源码!
walfud 彻底搞懂 RxJava 系列
Rx 规范