彻底搞懂 RxJava — 高级篇

createh52周前 (04-23)技术教程3

前言

本文献给所有想要深入了解 RxJava 的人.

如果你还没阅读过基础部分, 请先补习一下功课. 此外, 强烈建议你彻底理解中级部分再来阅读本文.

本文将带你理解 `.subscribeOn(Schedulers.io)` 和 `.observeOn(Schedulers.computation)` 用法背后的原理.

先看一下基本使用

我们来拆解一下 `.subscribeOn` 和 `.observeOn` 的作用范围:

  • `subscribeOn` 将作用于 `create` 中的 `OnSubscribe.call` 方法.
  • `observeOn` 作用于其语法中下一语句的 `Subscriber.onNext` 等函数中.

源码分析

首先分析 `subscribeOn`

与中级篇中的 `map` 一样, 是通过创建了一个 `Observable` 来转发 `OnSubscribe.call` 请求(代码中的 `OperatorSubscribeOn` 继承自 `OnSubscribe`). 来看看具体实现(已略去无关代码):


public void call(final Subscriber super T> subscriber) {
   final Worker inner = scheduler.createWorker;
   inner.schedule(new Action0 {
      @Override
      public void call {
         source.unsafeSubscribe(s);
      }
   });
}

可见, 该函数中做了如下两件事:

  1. 创建一个用于在不同线程执行的 `Worker` 对象(代码中的 inner)
  2. 使用上述 `inner` 在该对象所代表的线程中执行 `Observable.onSubscribe.call` 方法(代码中的 `source.unsafeSubscribe(s);`

梳理一下 `subscribeOn` 的实现方法

再来分析稍微复杂一点儿的 `observeOn`

回顾一下 `lift`


public final 

`lift` 创建创建了一个 `Observable` 和 一个 `OnSubscriber` 对象. 而在 `OnSubscribe` 对象中又创建了一个 `Subscriber` (代码中的 `st`) 对象. 整个 `observeOn` 的重点也就在这个 `st` 对象中. 我们先来看一下 `st` 是如何生成的:

`hook.onLift(operator).call(o);` 实际上调用的就是 `operator.call(o)`. 而 `operator` 是 `OperatorObserveOn` 的实例. 所以看下 `OperatorObserveOn.call` 方法是如何生成 `st` 对象的(已略去无关代码):


public Subscriber super T> call(Subscriber super T> child) {
   if (scheduler instanceof ImmediateScheduler) {
      // avoid overhead, execute directly
      return child;
   } else if (scheduler instanceof TrampolineScheduler) {
      // avoid overhead, execute directly
      return child;
   } else {
      ObserveOnSubscriber

该方法会根据 `scheduler` 的类型决定使用什么方式返回 `Subscriber` 对象. 可见, 如果 child 类型为 `ImmediateScheduler` 或者 `TrampolineScheduler` 等以当前线程为执行环境的类型, 则直接返回 `child` 对象. 本例中, `child` 为 `NewThreadScheduler`, 因此将通过 `ObserveOnSubscriber` 对 `child` 进行包装. 生成一个 proxy subscriber 对象.

至此, 我们可以知道 `observeOn` 是通过以下方法对其后面的 `Subscriber` 进行控制的:

  1. `lift` -> `OnSubscribe.call` -> `proxy subscriber = new Subscriber(original subscriber)` 创建了一个新的 `Subscriber`(实际上是个代理)
  2. 在上述 `proxy subscriber` 中对 `original subscriber` 对象的执行进行转发. 转发过程中, `proxy subscriber` 完全可以自由的控制 `original subscriber` 执行的线程.

整体梳理一下 `subscribeOn` + `observeOn` 的实现方法

Refs:

Grokking 带你入门

Bruce 大头鬼

扔物线大神

读源码!

walfud 彻底搞懂 RxJava 系列

Rx 规范

相关文章

我的Java!越过山丘

当年我是VS程序员,2000年被微软无情抛弃时,我徘徊过,到底继续跟着微软跑还是甩了它!当时兜里没钱,没法再花精力去学习除了名字很像其他都不一样的VS.net……既然甩了微软,那我为什么不直接选一个跨...

Java内存模型的历史变迁

本文通过介绍Java的新/旧内存模型,来展示Java技术的历史变迁。旧的Java内存模型Java使用的是共享内存的并发模型,在线程之间共享变量。Java语言定义了线程模型规范,通过内存模型控制线程与变...

如何识别Java中的内存泄漏

【编者按】作者Martin Gutenbrunner供职于Ruxit,拥有十年的Java Web应用程序架构和管理经验。近日,他在Dzone上撰文分享了Java内存泄漏识别相关经验,由OneAPM工程...

我们为什么要在Android中使用RxJava

感觉RxJava最近风生水起,不学习一下都不好意思了,洒家也是初学RxJava,也是感觉代码好像更复杂更难懂了,看了一篇外文感同身受,简单翻译一下。本文简单介绍使用RxJava优势所在。但可能需要有一...

详解Java中的注解

在Java中,注解(Annotation)引入始于Java5,用来描述Java代码的元信息,通常情况下注解不会直接影响代码的执行,尽管有些注解可以用来做到影响代码执行。注解可以做什么Java中的注解通...

Java反射机制剖析

java反射机制:1.指的是可以于运行时加载,探知和使用编译期间完全未知的类.2.程序在运行状态中, 可以动态加载一个只有名称的类, 对于任意一个已经加载的类,都能够知道这个类的所有属性和方法; 对于...