[Android开发] RxJava2之路十二- 异步和连接操作符例子Demo

时间:2021-01-27 17:48:48

一、操作符列表

1.1 异步操作符

异步操作符属于单独的rxjava-async模块,它们用于将同步对象转换为Observable。不支持Rxjava2.0,如果使用Rxjava1.0的话,可以导入下面的包就可以使用异步操作符了。

compile 'io.reactivex:rxjava-async-util:0.21.0'
名称 解析
start() 创建一个Observable,它发射一个函数的返回值
toAsync() / asyncAction()/ asyncFunc() 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值
startFuture() 将一个返回Future的函数转换为一个Observable,它发射Future的返回值
deferFuture() 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它
forEachFuture() 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成
fromAction() 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值
fromCallable() 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常
fromRunnable() 将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值
runAsync() 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions

1.2 连接操作符

一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observabe在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

名称 解析
ConnectableObservable.connect() 指示一个可连接的Observable开始发射数据
Observable.publish() 将一个Observable转换为一个可连接的Observable
Observable.replay() 确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅
ConnectableObservable.refCount() 让一个可连接的Observable表现得像一个普通的Observable

二、连接操作符

2.1 connect / publish操作符

connect 是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable,ConnectableObservable是Observable的子类 。

调用ConnectableObservable的connect方法会让它后面的Observable开始给发射数据给订阅者。

connect方法返回一个Disposable对象,可以调用它的dispose方法让Observable停止发射数据给观察者。

    /**
*
*/

private void testPublishCollect() {
//创建一个被观察者发射源
Observable<Long> observable = Observable.just(1L,2L,3L);

//publish操作符将普通Observable转换为可连接的Observable
ConnectableObservable<Long> connectableObservable = observable.publish();

//第一个订阅者订阅,不会开始发射数据
connectableObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong);
}
});

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

Log.e(TAG, "开始connect: ");
//如果不调用connect方法,connectableObservable则不会发射数据
connectableObservable.connect();

}

输出为:

03-03 12:27:59.845 13824-13824/cn.com.minstone.rxjavalearn E/CollectActivity@@: 开始connect: 
03-03 12:27:59.845 13824-13824/cn.com.minstone.rxjavalearn E/CollectActivity@@: accept: 1
03-03 12:27:59.845 13824-13824/cn.com.minstone.rxjavalearn E/CollectActivity@@: accept: 2
03-03 12:27:59.845 13824-13824/cn.com.minstone.rxjavalearn E/CollectActivity@@: accept: 3

2.2 replay操作符

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅

Replay操作符返回的ConnectableObservable会缓存订阅者订阅之前已经发射的数据,这样即使有订阅者在其发射数据开始之后进行订阅也能收到之前发射过的数据。Replay操作符能指定缓存的大小或者时间,这样能避免耗费太多内存。

    private void testReplay() {
//使用replay转换为ConnectableObservable,
ConnectableObservable<Long> connectableObservable = Observable.just(1L, 2L, 3L).replay();

//用replay的话,延迟订阅,这样子也可以接收到数据
connectableObservable.delaySubscription(3000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "connect accept: " + aLong);
}
});

Log.e(TAG, "开始connect");
connectableObservable.connect();

}

输出内容为:

    private void testReplay() {
//使用replay转换为ConnectableObservable,
ConnectableObservable<Long> connectableObservable = Observable.just(1L, 2L, 3L).replay();

//用replay的话,延迟订阅,这样子也可以接收到数据
connectableObservable.delaySubscription(3000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "connect accept: " + aLong);
}
});

Log.e(TAG, "开始connect");
connectableObservable.connect();

}

如果这里reply改为publish的话,订阅者就收不到信息了,输出内容就会如下

03-03 16:02:15.177 31981-31981/cn.com.minstone.rxjavalearn E/CollectActivity@@: 开始connect

2.3 refCount操作符

publish相反,让一个可连接的Observable转换为普通的Observable。

如果转化后有观察者者对其进行订阅将会开始发射数据。

    private void testRefCount() {
//转换为ConnectableObservable
ConnectableObservable<Long> connectableObservable = Observable.just(1L, 2L, 3L).publish();

//订阅,这时候不会输出,要connect之后才会接收到
connectableObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "connect accept: " + aLong);
}
});

Log.e(TAG, "开始connect");
connectableObservable.connect();


//转换为Observable
Observable<Long> observable = connectableObservable.refCount();


observable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "observable accept: "+aLong);
}
});


}

输出内容为:

03-03 15:52:29.675 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: 开始connect
03-03 15:52:29.675 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: connect accept: 1
03-03 15:52:29.675 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: connect accept: 2
03-03 15:52:29.675 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: connect accept: 3
03-03 15:52:29.675 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: observable accept: 1
03-03 15:52:29.685 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: observable accept: 2
03-03 15:52:29.685 30457-30457/cn.com.minstone.rxjavalearn E/CollectActivity@@: observable accept: 3