基于Rxjava2的事件总线:Rxbus

时间:2022-09-16 17:46:48

以前的项目中使用的是EventBus来实现事件的通知和订阅,RxJava2发布之后就使用了新的方式:RxBus,减少添加的依赖库。如果有什么错误的地方,或者有更好的建议的欢迎大家在下边留言,互相学习。

没有背压处理(Backpressure)的 RxBus

import android.support.annotation.NonNull;

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;


public class RxBus {

private final Subject<Object> mBus;

private RxBus() {
mBus = PublishSubject.create().toSerialized();
}

public static RxBus getInstance() {
return Holder.BUS;
}

public void post(@NonNull Object obj) {
mBus.onNext(obj);
}

public <T> Observable<T> register(Class<T> tClass) {
return mBus.ofType(tClass);
}

public Observable<Object> register() {
return mBus;
}

public boolean hasObservers() {
return mBus.hasObservers();
}

public void unregisterAll() {
//会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了
mBus.onComplete();
}

private static class Holder {
private static final RxBus BUS = new RxBus();
}

}

有背压(Backpressure)处理的RxBus:

import android.support.annotation.NonNull;

import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;

public class RxBus {

private final FlowableProcessor<Object> mBus;

private RxBus() {
mBus = PublishProcessor.create().toSerialized();
}

private static class Holder {
private static RxBus instance = new RxBus();
}

public static RxBus getInstance() {
return Holder.instance;
}

public void post(@NonNull Object obj) {
mBus.onNext(obj);
}

public <T> Flowable<T> register(Class<T> clz) {
return mBus.ofType(clz);
}

public Flowable<Object> register() {
return mBus;
}

public void unregisterAll() {
//会将所有由mBus生成的Flowable都置completed状态后续的所有消息都收不到了
mBus.onComplete();
}

public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
}

在发送消息的activity中代码:

RxBus.getInstance().post("111");

在接收消息的activity中代码:

RxBus.getInstance().register(String.class).subscribe(new Consumer<String>() {
@Override
public void accept(String integer) throws Exception {
toast(integer);
}
});

像上边直接传基本数据类型在实际项目中不推荐这样使用。我们可以自定义消息类(或者直接传JavaBean),例如:

public class MsgEvent<T> {

private T data;

private String mMsg;
private int type;
private int request;

public MsgEvent(T data) {
this.data = data;
}

public MsgEvent(int request, int type, String msg) {
this.type = type;
this.mMsg = msg;
this.request = request;
}
public String getMsg(){
return mMsg;
}
public int getType(){
return type;
}
public int getRequest(){ return request; }

public T getData(){return data;}

}

在发送消息的时候,自己定义消息:

RxBus.getInstance().post(new MsgEvent(11,45,"今天天气很好"));

在接收消息的时候,选择性接收消息:

        RxBus.getInstance().register(MsgEvent.class).subscribe(new Consumer<MsgEvent>() {
@Override
public void accept(MsgEvent msg) throws Exception {
if (msg.getRequest() == 11) {
tv.setText(msg.getMsg());
}
}
});

这里说明一下unregisterAll()方法,这个方法一旦调用了以后,所有的消息都是收不到的,所以如果要调用的话,建议在退出程序的Activity里面调用。

public void unregisterAll() {
//会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了
mBus.onComplete();
}

效果图:
基于Rxjava2的事件总线:Rxbus

点击下载源码