Rxjava2 介绍与详解实例

时间:2023-03-09 16:34:35
Rxjava2 介绍与详解实例

前言

现在我们可以看到越来越多的开发者都在使用 Rx 相关的技术进行 App,Java 后端等领域进行开发。在开源的社区以及互联网公司,Rx、响应式编程、函数式都是热门的存在。所以笔者将结合自身的学习以及实际使用情况,写一个针对 Rxjava2 的系列文章,一起学习和使用 Rxjava 所带来的便捷。

笔者将利用工作之余,结合 ReactiveX 官方 Wiki 对 Rxjava 的定义与介绍,对相关基础知识、基本操作,常用部分的 API 进行整理,并加上个人理解和相关操作的示例。

相关参考链接:

Rxjava2 系列文章目录:

实例代码:

RX介绍

ReactiveX的历史

ReactiveX 是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是reactivex.io。

什么是ReactiveX

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式迭代器模式函数式编程的精华。

RxJava 到底是什么

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

然而,对于初学者来说,这还是比较含蓄难懂的。因为它是一个总结,而初学者更需要一个入门的介绍或者理解。其实, RxJava 的本质可以总结为异步的概念。说到本质上,它就是一个实现异步操作的库。RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

RxJava 优点

同样是做异步,为什么去使用它,而不用现成的 Thread,ThreadPoolExecutor,Android的AsyncTask / Handler / ... ?其实就是简洁,易用 !

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 正如Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁

名词定义

  • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式。
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念。
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者。
  • Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现。
  • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射。
  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项。

Rx模式

使用观察者模式

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据流并执行操作

简化代码

  • 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
  • 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
  • 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
  • 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种

    并发问题

使用Observable的优势

Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

Observable通过使用最佳的方式访问异步数据序列填补了这个间隙。

类型 单个数据 多个数据
同步 T getData() Iterable getData
异步 Future<T> getData() Observable<T> getData()

Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种

简单、可组合的操作。

1. Observable可组合

对于单层的异步操作来说,Java中Future对象的处理方式是非常简单有效的,但是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,要想实现还是可以做到的,但是非常困难,或许你可以用 Future.get() ,但这样做,异步执行的优势就完全没有了。从另一方面说,Rx的bservable一开始就是为组合异步数据流准备的。

2. Observable更灵活

Rx的Observable不仅支持处理单独的标量值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。 Observable 是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的全部优雅与灵活。

Observable是异步的双向push,Iterable是同步的单向pull,对比:

事件 Iterable(pull) Observable(push)
获取数据 T next() onNext(T)
异常处理 throws Exception onError(Exception)
任务完成 !hasNext() onCompleted

3. Observable无偏见

Rx对于对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何满足你的需求的,你擅长或偏好的方式都可以。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码将所有与Observable的交互都当做是异步的。

Rx使用依赖:

下列是笔者使用的版本(可根据实际情况进行选择):

  1. 使用Gradle依赖:implementation "io.reactivex.rxjava2:rxjava:2.2.12"
  2. 使用Maven依赖或者Jar包下载 :Rxjava 2.2.12
  3. 其他版本以及相关下载 :Maven

Rxjava的入门基础

1. Observable

1.1 观察者模式

基本概念:Observable (可观察者,即被观察者)、Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer(观察者观察被观察者的通知事件)。

在RxJava中,一个实现了 Observer 接口的对象可以订阅 (subscribe) 一个 Observable 类的实例。订阅者(subscriber) 对 Observable 发射 (emit) 的任何数据或数据序列作出响应。这种模式 简化了并发操作,因为它不需要阻塞等待 Observable 发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。

RxJava 的事件回调方法: onSubscribe()onNext()onCompleted()onError()

  • onSubscribe(): 当被观察者被观察者订阅的时候触发。
  • onNext(): 当被观察者发送数据的时候通过此方法通知观察者数据变换。
  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

注意: 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

1.2 Consumer 和 Action

这两个词意思分别是 消费者(可以理解为消费被观察者发射出来的事件)和 行为(可以理解为响应被观察者的行为)。对于 Observer 中的 4 个回调方法,我们未必都能用得到,如果只需要用到其中的一部分,就需要 Consumer 和 Action 上场了。

简单示例:

	// 1. 进行订阅,subscribe(Observer)
observable.subscribe(observer); System.out.println("---------------------------------------------");
// 2. 进行订阅,subscribe(Consumer onNext)
observable.subscribe(nextConsumer); System.out.println("---------------------------------------------");
// 3. 进行订阅,subscribe(Consumer onNext, Consumer onError)
observable.subscribe(nextConsumer, errorConsumer); System.out.println("---------------------------------------------");
// 4. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted)
observable.subscribe(nextConsumer, errorConsumer, completedAction); System.out.println("---------------------------------------------");
// 5. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe)
observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);

1.3 Observable的分类

在RxJava中,Observable 有 Hot 与 Cold 之分。

  • Hot Observable : 无论有没有观察者进行订阅,事件始终都会发生。当有多个观察者订阅时,Hot Observable此时与订阅者们的关系时一对多的关系,可以与多个订阅者共享信息。
  • Cold Observable : 只有有观察者订阅了,才开始执行数据流的发送,并且与观察者时一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的,也就是说对于订阅者们来说,它们的事件是彼此独立的。

Javadoc: Observable

2. Flowable

Rxjava2.x 中有这么一个被观察者 Flowable,同样作为被观察者,它和Observable有什么区别呢,在Rxjava2中,Observable不再支持背压,而新增的Flowable支持背压,何为背压,就是异步场景下上游发送事件的速度大于下游处理事件的速度所产生的现象。

Rxjava2 介绍与详解实例

提示:在本系列后面会有详细的单独篇章来介绍和如何使用背压。

Javadoc: Flowable

3. Single

Single 类似于 Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值。

因此,不同于Observable需要三个方法 onNext, onError, onCompleted,订阅Single只需要两个方法:

  • onSuccess: Single发射单个的值到这个方法
  • onError: 如果无法发射需要的值,Single发射一个Throwable对象到这个方法

Single 只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

Rxjava2 介绍与详解实例

示例代码:

	// Single: 只发送 onSuccess or onError 通知,并且只会发送一次, 第一次发送数据后的都不会在处理
Single.create(new SingleOnSubscribe<String>() { @Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
emitter.onSuccess("Success"); // 发送success通知
emitter.onSuccess("Success2"); // 只能发送一次通知,后续不在处理
}
}).subscribe(new BiConsumer<String, Throwable>() { @Override
public void accept(String t1, Throwable t2) throws Exception {
System.out.println("--> accept: t1 = " + t1 + ", t2 = " + t2);
}
});

输出:

--> accept: t1 = Success,  t2 = null

提示:Single 可以通过 toXXX 方法转换为 Observable, Flowable, Completable与Maybe。

Javadoc: Single

4. Completable

Completable 在创建后,不会发射任何数据, 只有 onCompleteonError 事件,同时没有Observable中的一些操作符,如 map,flatMap。通常与 andThen 操作符结合使用。

Rxjava2 介绍与详解实例

示例代码:

	// 1. Completable:只发送complete 或 error 事件,不发送任何数据
Completable.fromAction(new Action() { @Override
public void run() throws Exception {
System.out.println("Hello World! This is Completable.");
}
}).subscribe(new CompletableObserver() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
} @Override
public void onError(Throwable e) {
System.out.println("--> onError");
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
}); System.out.println("----------------------------------------------");
// 2. 与 andThen 结合使用,当Completable执行完onCompleted后,执行andThen里的任务
Completable.create(new CompletableOnSubscribe() { @Override
public void subscribe(CompletableEmitter emitter) throws Exception {
Thread.sleep(1000);
System.out.println("--> completed");
emitter.onComplete();
}
}).andThen(Observable.range(1, 5)).subscribe(new Consumer<Integer>() { @Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept: " + t);
}
});

输出:

--> onSubscribe
Hello World! This is Completable.
--> onComplete
----------------------------------------------
--> completed
--> accept: 1
--> accept: 2
--> accept: 3
--> accept: 4
--> accept: 5

提示:Completable 可以通过 toXXX 方法转换为 Observable, Flowable, Single与Maybe。

Javadoc: Completable

5. Maybe

Maybe 是 Rxjava 2.x 以后的新类型,只能发射 0 或者 1 项数据,即使后续有多个数据,后面的数据也不会被处理。可以看做是 Single 与 Completable 结合。

Rxjava2 介绍与详解实例

示例代码:

	// Maybe 只发送0个或者1个数据,后续数据将被忽略
Maybe.create(new MaybeOnSubscribe<String>() { @Override
public void subscribe(MaybeEmitter<String> emitter) throws Exception {
// 如果先发送了,将会调用MaybeObserver的onCompleted方法,如果有数据发送或者调用onError,则不会去调用
// emitter.onComplete();
emitter.onSuccess("Hello"); // 如果发送了第一个数据后续数据将不会被处理
emitter.onSuccess("World");
}
}).subscribe(new MaybeObserver<String>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
} @Override
public void onSuccess(String t) {
System.out.println("--> onSuccess: " + t);
} @Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
});

输出:

--> onSubscribe
--> onSuccess: Hello

提示:Maybe 可以通过 toXXX 方法转换为 Observable, Flowable, Single与Completable。

Javadoc: Maybe

6. Subject

Subject 可以看成是一个桥梁或者代理,在 RxJava 实现中,它同时充当了 ObserverObservable 的角色。因为它是一个Observer,它可以订阅一个或多个 Observable;又因为它是一个 Observable ,它可以转发它收到(Observe)的数据,也可以发射新的数据。

它既可以是数据源observerable,也可以是数据的订阅者Observer。这个可以通过源码来了解一下。

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
...
}

Subject 实际上还是 Observable,不过它因为实现了Observer接口,可以通过onNext、onComplete、onError方法发射和终止发射数据。

注意: 不要使用just(T)from(T)create(T)来使用Subject,因为会把Subject转换为Obserable。

在 Rxjava 中,官方一共为我们提供了几种Subject:

  • AsyncSubject (仅释放接收到的最后一个数据)
  • BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据)
  • PublishSubject (释放订阅后接收到的数据)
  • ReplaySubject (释放接收到的所有数据)
  • UnicastSubject (仅支持订阅一次的Subject)
  • Serialized(串行化)
  • TestSubject(在2.x中被TestScheduler和TestObserver替代)

6.1 AsyncSubject

AsyncSubject 仅释放 onComplete() 之前的最后一个数据(必须调用subject.onComplete()才会发送数据,否则观察者不会接收到任何数据)。

可以获取数据业务逻辑的最后的结果数据。

Rxjava2 介绍与详解实例

注意: 如果因异常(Error)终止,将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。

实例代码:

	// 注意: 不要使用just(T)、from(T)、create(T)来使用Subject,因为会把Subject转换为Obserable
// 无论订阅的时候AsyncSubject是否Completed,均可以收到最后一个值的回调
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("emitter 1");
asyncSubject.onNext("emitter 2");
asyncSubject.onNext("emitter 3");
asyncSubject.onNext("emitter 4");
asyncSubject.onNext("emitter 5"); // 此时订阅后将近发送此项数据
// asyncSubject.onNext(1/0 + ""); // 发生error时将不会有数据发射,仅发送error通知
asyncSubject.onComplete(); // 订阅后只会接收最后一个数据
asyncSubject.subscribe(new Observer<String>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
} @Override
public void onNext(String t) {
System.out.println("--> onNext = " + t);
} @Override
public void onError(Throwable e) {
System.out.println("--> onError = " + e);
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
});

输出:

--> onSubscribe
--> onNext = emitter 5
--> onComplete

Javadoc: AsyncSubject

6.2 BehaviorSubject

当观察者订阅 BehaviorSubject 时,它开始发射原始Observable在订阅前的最后一个发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

可以缓存订阅前最后一次发出的数据,以及订阅后发送的所有数据。

Rxjava2 介绍与详解实例

注意: 如果因异常(Error)终止,将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。

实例代码:

	// 创建无默认值的BehaviorSubject
BehaviorSubject<Integer> subject = BehaviorSubject.create();
// 创建有默认值的BehaviorSubject
BehaviorSubject<Integer> subjectDefault = BehaviorSubject.createDefault(-1); // 观察者对象
Observer<Integer> observer = new Observer<Integer>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--------------------------------");
System.out.println("--> onSubscribe");
} @Override
public void onNext(Integer t) {
System.out.println("--> onNext: " + t);
} @Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
}; // 1. 无数据发送的时候,发送默认值
// subjectDefault.subscribe(observer); // 2. 此时会发射所有订阅后正常发射的数据: 1, 2, 3, 4, error
// subject.subscribe(observer);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3); // 3. 此时会发射订阅前的一个数据及后面正常发射的数据: 3, 4, error
// subject.subscribe(observer);
subject.onNext(4);
subject.onError(new NullPointerException()); // 4. 此时不会发射后续数据,仅发送Error通知
// subject.subscribe(observer);
subject.onNext(5);
subject.onComplete(); // 5. 此时没有数据发射,如果有error存在的话,将会发送error
subject.subscribe(observer);

输出:

--------------------------------
--> onSubscribe
--> onNext: -1
--------------------------------
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException

Javadoc: BehaviorSubject

6.3 PublishSubject

PublishSubject 只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject 可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间间隙内,可能有一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:使用Create创建那个Observable以便手动给它引入 "冷" Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用 ReplaySubject

如果 PublishSubject 在订阅前已经调用了 onComplete() 方法,则观察者不会接收到数据。

Rxjava2 介绍与详解实例

注意: 如果因异常(Error)终止,将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。

实例代码:

	// 释放订阅后接收到正常发射的数据,有error将不会发射任何数据
PublishSubject<Integer> subject = PublishSubject.create();
// 观察者对象
Observer<Integer> observer = new Observer<Integer>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--------------------------------");
System.out.println("--> onSubscribe");
} @Override
public void onNext(Integer t) {
System.out.println("--> onNext: " + t);
} @Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
}; // 1. 此时订阅将释放后续正常发射的数据: 1,2, 3, 4, error
// subject.subscribe(observer);
subject.onNext(1);
subject.onNext(2); // 2. 此时订阅,发射后续正常发射的数据:3, 4, error
// subject.subscribe(observer);
subject.onNext(3);
subject.onNext(4); // 此时将不会发送任何数据,直接发送error
subject.onError(new NullPointerException());
subject.onNext(5);
subject.onComplete(); // 3. 此时订阅如果有error,仅发送error,否则无数据发射
subject.subscribe(observer);

输出:

--------------------------------
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException

Javadoc: PublishSubject

6.4 ReplaySubject

ReplaySubject 会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也 有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。

如果你把 ReplaySubject 当作一个观察者使用,注意不要从多个线程中调用它的onNext方法 (包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议, 给Subject的结果增加了不确定性。

Rxjava2 介绍与详解实例

Rxjava2 介绍与详解实例

ReplaySubject 还可以限制缓存数据的数量,限制缓存的时间:

  • create(bufferAge):指定内部缓存,减少内部缓存区增长过多的重分配
  • createWithSize(maxAge):指定订阅后接受之前已经发射过数据的 maxAge 个数据项
  • createWithTime(timeout, TimeUnit, Scheduler) :接受订阅后接受之前已经发射过指定 timeout 时间段内的数据项

实例代码:

	// 1. 接受收到的所有数据以及通知,对每隔Observer都执行相同的独立的操作
ReplaySubject<Integer> subject = ReplaySubject.create(); // 2. 指定内部缓存大小,此方法避免在内部缓冲区增长以容纳新缓冲区时过多的数组重分配
// ReplaySubject<Integer> subject = ReplaySubject.create(5); // 3. createWithSize(count)
// 指定保留订阅前数据项的个数的Subject,会发射订阅前count个数据和后续的数据
// ReplaySubject<Integer> subject = ReplaySubject.createWithSize(1); // 4. createWithTime(maxAge, unit, scheduler)
// 指定保留订阅前指定maxAge时间段内数据和后续的数据
// ReplaySubject<Integer> subject = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()); // 创建Observer(观察者), 可以接受Observable所有通知
Observer<Integer> observer = new Observer<Integer>() { public void onSubscribe(Disposable d) {
System.out.println("----------------------------------");
System.out.println("--> onSubscribe");
} public void onNext(Integer t) {
System.out.println("--> onNext = " + t);
} public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} public void onComplete() {
System.out.println("--> onComplete");
}
}; // 正常接受所有Observable的数据和通知
subject.subscribe(observer);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3); // 正常接受所有Observable的数据和通知
subject.subscribe(observer);
subject.onNext(4);
// 如果有error,则发送error通知,不影响任何一个观察者数据与通知接受
// subject.onError(new NullPointerException());
subject.onNext(5);
subject.onComplete(); // 正常接受所有Observable的数据和通知
subject.subscribe(observer);

输出:

----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--> onNext = 4
--> onNext = 4
--> onNext = 5
--> onNext = 5
--> onComplete
--> onComplete
----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--> onNext = 4
--> onNext = 5
--> onComplete

Javadoc: ReplaySubject

6.5 UnicastSubject

UnicastSubject 是仅支持订阅一次的 Subject ,如果多个订阅者试图订阅这个 Subject 将会受到 IllegalStateException

常用于一次性消费或安全场合,如网络结算,支付等。

实例代码:

	// 创建UnicastSubject,只能被订阅一次,不能再次被订阅
UnicastSubject<Integer> subject = UnicastSubject.create(); // 创建Observer(观察者), 可以接受Observable所有通知
Observer<Integer> observer = new Observer<Integer>() { public void onSubscribe(Disposable d) {
System.out.println("--------------------------------");
System.out.println("--> onSubscribe");
} public void onNext(Integer t) {
System.out.println("--> onNext = " + t);
} public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} public void onComplete() {
System.out.println("--> onComplete");
}
};
// 订阅后,此subject将不可以再被订阅了
subject.subscribe(observer); subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
// 此时会有IllegalStateException,因为只能订阅一次,不能重复订阅
subject.subscribe(observer);
subject.onNext(4);
subject.onNext(5);
subject.onComplete(); // 此时会有IllegalStateException,因为只能被订阅一次,不能重复订阅
subject.subscribe(observer);

输出:

--------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--------------------------------
--> onSubscribe
--> onError: java.lang.IllegalStateException: Only a single observer allowed.
--> onNext = 4
--> onNext = 5
--> onComplete
--------------------------------
--> onSubscribe
--> onError: java.lang.IllegalStateException: Only a single observer allowed.

Javadoc: UnicastSubject

6.6 SerializedSubject

在并发情况下,不推荐使用通常的Subject对象,此时会产生多次调用产生一系列不可控的问题,而是推荐使用 SerializedSubject,并发时只允许一个线程调用onNext等方法,将Subject 串行化 后,所有其他的Observable和Subject方法都是线程安全的。

注意: 在Rxjava2 中 SerializedSubject 是一个不公开(不是public)的类型,意味着不可以直接创建使用,但是可以通过Subject.toSerialized()方法将Subject对象串行化保证其线程安全。同时也提供了 SerializedObserver,SerializedSubscriber等来包装对象成为串行化对象。

实例代码:

	// 创建Subject
ReplaySubject<String> subject = ReplaySubject.create(); // 通过toSerialized()进行串行化
Subject<String> serialized = subject.toSerialized(); // 订阅
serialized.subscribe(new Consumer<String>() { @Override
public void accept(String t) throws Exception {
System.out.println("--> accept: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId());
}
}); // 多线程执行
for (int i = 0; i < 10; i++) {
final int value = i + 1;
new Thread(new Runnable() { @Override
public void run() {
serialized.onNext(value + "-SendThreadID: " + Thread.currentThread().getId());
}
}).start();
} System.in.read(); System.out.println("---------------------------------------------------------------------"); // 创建一个 SerializedObserver来进行串行化,保证线程安全
// 注意:只保证同时只有一个线程调用 onNext, onCompleted, onError方法,并不是将所有emit的值放到一个线程上然后处理
SerializedObserver<String> observer = new SerializedObserver<String>(new Observer<String>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
} @Override
public void onNext(String t) {
System.out.println("--> onNext: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId());
} @Override
public void onError(Throwable e) {
System.out.println("--> onError");
} @Override
public void onComplete() {
System.out.println("--> onComplete");
}
}); // 订阅
subject.subscribe(observer); // 多线程执行
for (int i = 0; i < 10; i++) {
final int value = i + 1;
new Thread(new Runnable() { @Override
public void run() {
subject.onNext(value + "-SendThreadID: " + Thread.currentThread().getId());
// if (value == 10) {
// subject.onComplete();
// }
}
}).start();
} System.in.read();

输出:

--> accept: 1-SendThreadID: 11, ReceiverThreadID: 11
--> accept: 2-SendThreadID: 12, ReceiverThreadID: 11
--> accept: 10-SendThreadID: 20, ReceiverThreadID: 11
--> accept: 9-SendThreadID: 19, ReceiverThreadID: 11
--> accept: 8-SendThreadID: 18, ReceiverThreadID: 11
--> accept: 7-SendThreadID: 17, ReceiverThreadID: 11
--> accept: 6-SendThreadID: 16, ReceiverThreadID: 11
--> accept: 4-SendThreadID: 14, ReceiverThreadID: 11
--> accept: 5-SendThreadID: 15, ReceiverThreadID: 11
--> accept: 3-SendThreadID: 13, ReceiverThreadID: 11
---------------------------------------------------------------------
--> onSubscribe
--> onNext: 1-SendThreadID: 11, ReceiverThreadID: 11
--> onNext: 3-SendThreadID: 13, ReceiverThreadID: 11
--> onNext: 4-SendThreadID: 14, ReceiverThreadID: 11
--> onNext: 5-SendThreadID: 15, ReceiverThreadID: 11
--> onNext: 6-SendThreadID: 16, ReceiverThreadID: 16
--> onNext: 7-SendThreadID: 17, ReceiverThreadID: 16
--> onNext: 8-SendThreadID: 18, ReceiverThreadID: 16
--> onNext: 9-SendThreadID: 19, ReceiverThreadID: 16
--> onNext: 10-SendThreadID: 20, ReceiverThreadID: 16

6.7 TestSubject

Rxjava2 中已经取消了TestSubject,使用TestSchedulerTestObserver替代,下面主要以 TestObserver 为例进行介绍。

TestObserver 是一个一个记录事件并允许对其进行断言的观察者,多用于测试场合。一般可以创建一个TestObserver 对象或者从Observable 或者 Subject 中直接调用 test() 方法获取。

实例代码:

	// Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(100);
emitter.onError(new NullPointerException());
emitter.onComplete();
}
}); // 1. 创建TestObserver对象
TestObserver<Integer> testObserver = TestObserver.create(new Observer<Integer>() { @Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe:");
} @Override
public void onNext(Integer t) {
System.out.println("--> onNext: " + t);
} @Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
} @Override
public void onComplete() {
System.out.println("--> onComplete:");
}
}); observable.subscribe(testObserver);
try {
// 断言是否为收到订阅,但是没有事件发送
testObserver.assertEmpty();
// 断言是否收到onComplete()
testObserver.assertComplete();
// 断言没有数据100发送
testObserver.assertNever(100);
// 断言接收数据结果
testObserver.assertResult(1, 2, 3);
// 断言异常
testObserver.assertError(NullPointerException.class);
... 更多请参考Api
} catch (Error e) {
System.out.println("Error: " + e);
} System.out.println("-----------------------------------------------");
// Subject
AsyncSubject<Object> subject = AsyncSubject.create(); // 2. 从Observable或者Subject中获取TestObserver对象
TestObserver<Integer> test = observable.test();
TestObserver<Object> test2 = subject.test();
System.out.println(test.values()); // received onNext values
try {
// 断言是否为收到订阅,但是没有事件发送
test.assertEmpty();
test2.assertEmpty();
// 断言是否收到onComplete()
test.assertComplete();
// 断言没有数据100发送
test.assertNever(100);
// 断言接收数据结果
test.assertResult(1, 2, 3);
// 断言异常
test.assertError(NullPointerException.class);
... 更多请参考Api
} catch (Error e) {
System.out.println("Error: " + e);
}

输出(当出现断言不匹配的情况,会有相应Error抛出):

--> onSubscribe:
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 100
--> onError: java.lang.NullPointerException
Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0)
-----------------------------------------------
[1, 2, 3, 100]
Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0)

Javadoc: TestObserver

6.8 Processor

ProcessSubject 的作用和使用相同。Process 是 Rxjava2 中的新功能,它是一个接口,继承自 Subscriber、Publish。与Subject 最大的区别是 Process 支持背压,关于背压,后续将会有专题文章来做详细介绍。

7. Scheduler

如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable)在特定的调度器(Scheduler)上执行。

某些ReactiveX的Observable操作符有一些变体,它们可以接受一个Scheduler参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。

使用ObserveOn和SubscribeOn操作符,你可以让Observable在一个特定的调度器上执行, ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和 onCompleted方法,SubscribeOn更进一步,它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

调度器的种类

下表展示了RxJava中可用的调度器种类:

调度器类型 作用
Schedulers.computation() 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io()),默认线程数等于处理器的数量 。
Schedulers.from(executor) 使用指定的Executor作为调度器。
Schedulers.trampoline() 调度在当前线程上工作,但不立即执行。当其它排队的任务完成后,在当前线程排队开始执行。
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
Schedulers.newThread() 为每个任务创建一个新线程
Schedulers.single() 一个默认的、共享的、单线程支持的调度器实例,用于需要在同一后台线程上强顺序执行的工作。

关于Rxjava中的线程模型、线程转换操作、调度器的使用等后面会有专题文章来详细介绍。

小结:

本章主要介绍了Rxjava的概念与添加使用依赖、Rxjava中的观察者模式、Observable、Flowable、Subject,Schedule等基础对象的介绍与使用,应该可以对Rxjava的概念及基本对象有了基本的认识和了解,以及简单的上手使用。

有关Rxjava2 的其他相关部分内容,后续将有系列的文章来介绍,请关注上面的实时文章目录。