1.doOnNext
它产生的Observable每发射一项数据就会调用它一次,但是它的Action不是接受一个Notification参数,而是接受发射的数据项。
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
doOnNext一般用于在subscribe之前对数据的一些处理,比如数据的保存等;
public void onSaveData() {
Observable
.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
List<NewsChannelBean> oldItems = dao.query(Constant.NEWS_CHANNEL_ENABLE);
e.onNext(!compare(oldItems, adapter.getmMyChannelItems()));
}
})
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
if (aBoolean) {
List<NewsChannelBean> enableItems = adapter.getmMyChannelItems();
List<NewsChannelBean> disableItems = adapter.getmOtherChannelItems();
dao.removeAll();
for (int i = 0; i < enableItems.size(); i++) {
NewsChannelBean bean = enableItems.get(i);
dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_ENABLE, i);
}
for (int i = 0; i < disableItems.size(); i++) {
NewsChannelBean bean = disableItems.get(i);
dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_DISABLE, i);
}
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean isRefresh) throws Exception {
RxBus.getInstance().post(NewsTabLayout.TAG, isRefresh);
}
}, ErrorAction.error());
}
2、doFinally
当它产生的Observable终止之后会被调用,无论是正常还 是异常终止
/** * 加载数据 * * @param isRefresh 是否为下拉刷新(初次加载) * @param isNeedProgress 是否需要显示Progress */
private void laodData(final boolean isRefresh, final boolean isNeedProgress) {
if (!ContextUtils.checkNetworkConnection(getActivity())) {
mEmptyViewHelper.setNoNetworkEmptyView(true);
ContextUtils.showToast(getActivity(), R.string.noconnectionremind);
return;
}
mEmptyViewHelper.setNoNetworkEmptyView(false);
Observable.create(new ObservableOnSubscribe<ClientRecvObject>() {
@Override
public void subscribe(@NonNull ObservableEmitter<ClientRecvObject> e) throws Exception {
User loginedUser = LoginUserManager.getLoginedUser(mConfiguration);
int pageIndex = 1;
if (!isRefresh) {
pageIndex = mPageIndex + 1;
}
ClientRecvObject remindClient = RemindConnector.getQARemindList(getActivity(), loginedUser, pageIndex);
e.onNext(remindClient);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
if (null == getActivity()) {
disposable.dispose();
return;
}
if (isNeedProgress) {
showLoadingProgress(getString(R.string.loading), true);
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() throws Exception {
dismissLoadingProgress();
}
}).subscribe(new Observer<ClientRecvObject>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull ClientRecvObject clientRecvObject) {
if (getActivity() == null) {
disposable.dispose();
}
if (null != clientRecvObject && clientRecvObject.isSuccess()) {
if (!isRefresh) {
mPageIndex++;
}
RemindList remindList = (RemindList) clientRecvObject.getClientData();
refreshUI(isRefresh, remindList);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}