【笔记】RxJava2.0新特性简单介绍并实现RxBus

时间:2022-02-10 17:47:34

RxJava2.0新特性

简单总结介绍下:

  1. 不再支持传null,传了直接正常结束或者抛异常;
  2. Observable不再支持背压,新加入Flowable支持非阻塞的背压,并且所有操作符都强制支持背压;
  3. Single类可单独发送onSuccess或onError消息;
  4. Completable只是改变了命名;
  5. 新增Maybe,可以说是Single和Completable结合体,只能发送0或1个事件或错误;
  6. 很多基础类实现了类似Publisher<T>的接口;
  7. 支持背压的都是FlowableProcessor<T>的子类,Subject不再支持T -> R的转换;
  8. TestSubject被废弃;
  9. SerializedSubject 不再是公共方法,需要使用Subject.toSerialized()和FlowableProcessor.toSerialized()替代;
  10. GroupedObservable编程抽象类;
  11. 可以自己实现功能接口,并且所有功能接口都会抛出异常,不需要try-catch;
  12. 减少组件数量,Action0由io.reactivex.functions.Action和Scheduler代替,Action1重命名Consumer,Action2重命名BiConsumer,ActionN被Consumer<Object[]>代替;
  13. Functions按照Java8明明风格命名;
  14. 使用轻量级Subscriber接口,整合请求管理和取消,为Flowable定义抽象类,支持外部取消dispose(),onCompleted重命名,
  15. request前必须完成初始化工作;
  16. 1.0中Subscription重命名成Disposable;
  17. Reactive-Streams规范的操作符支持背压,异常会在onNext抛出,Observable完全不支持背压;
  18. Reactive-Streams compliance
  19. 重新设计RxJavaPlugins,RxJavaHooks功能被加入到了RxJavaPlugins;
  20. 调度类调整;
  21. 每个基础类都有create操作符用于支持背压和取消;
  22. Subscriber和Observer不允许主动抛出异常;
  23. 为了支持内部测试,所有基础类都有test()方法;
  24. TestObserver可在订阅前取消TestSubscriber/TestObserver;
  25. 阻塞终端成为可能;
  26. 1.x中使用Mockito和Observer的用户需要使用Subscriber.onSubscribe进行初始化请求;
  27. 大部分操作符仍然保留,但进行了重载;
  28. 1.x Observable 到 2.x Flowable
  29. 实例方法
  30. 不同返回值
  31. 移除方法
  32. doOnCancel/doOnDispose/unsubscribeOn变化说明

以上是个人大致总结,详情还是看官方文档 What's different in 2.0中文翻译,虽然变动很多,但在实际使用中估计就背压和部分类的改变可能有所影响。

RxBus

RxBus不是一个库,而是一种模式,是使用RxJava来实现EventBus。用RxBus来替代EventBus减少了程序对第三方库的应用。

直接上代码

public class RxBus {
private static RxBus instance;

public static RxBus getInstance() {
if (instance == null) {
synchronized (RxBus.class) {
if (instance == null) {
instance = new RxBus();
}
}
}
return instance;
}

private Subject<Object> subject;
private HashMap<Object, CompositeDisposable> disposableHashMap;

private RxBus() {
subject = PublishSubject.create().toSerialized();
disposableHashMap = new HashMap<>();
}

/**
* 发送事件
*
* @param event 递送的事件
*/
public void post(Object event) {
subject.onNext(event);
}

/**
* 返回指定类型的带背压的Flowable实例
*
* @param type
* @param <T>
* @return
*/
private <T> Flowable<T> getObservable(Class<T> type) {
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}

/**
* 订阅事件
* @param subscriber 订阅者对象
* @param type 事件的类型
* @param next 事件的处理程序
* @param error 事件的异常处理
* @param <T>
*/
public <T> void subscribe(Object subscriber, Class<T> type, Consumer<T> next, Consumer<Throwable> error) {
Disposable disposable = getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next, error);
addSubscription(subscriber, disposable);
}

/**
* 保存订阅后的disposable
*
* @param subscriber
* @param disposable
*/
private void addSubscription(Object subscriber, Disposable disposable) {
if (disposableHashMap.get(subscriber) != null) {
disposableHashMap.get(subscriber).add(disposable);
} else {
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
disposableHashMap.put(subscriber, compositeDisposable);
}
}

/**
* 是否有观察者订阅
*
* @return
*/
public boolean hasObservers() {
return subject.hasObservers();
}

/**
* 取消订阅
*
* @param subscriber
*/
public void unSubscribe(Object subscriber) {
if (disposableHashMap.containsKey(subscriber)) {
disposableHashMap.get(subscriber).dispose();
disposableHashMap.remove(subscriber);
}
}
}
单例实现,使用中只需要订阅事件subscribe和发送事件post即可,与RxJava1.x实现的RxBus几乎一样,熟悉的用法,不一样的实现而已。