RxJava2的do系列操作符之doOnNext和doFinally

时间:2022-05-29 14:33:38

1.doOnNext
它产生的Observable每发射一项数据就会调用它一次,但是它的Action不是接受一个Notification参数,而是接受发射的数据项。

Observable.just(1, 2, 3)
          .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
            if( item > 1 ) {
              throw new RuntimeException( "Item exceeds maximum value" );
            }
          }
        }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

doOnNext一般用于在subscribe之前对数据的一些处理,比如数据的保存等;

   public void onSaveData() {

        Observable
                .create(new ObservableOnSubscribe<Boolean>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
                        List<NewsChannelBean> oldItems = dao.query(Constant.NEWS_CHANNEL_ENABLE);
                        e.onNext(!compare(oldItems, adapter.getmMyChannelItems()));
                    }
                })
                .subscribeOn(Schedulers.io())
                .doOnNext(new Consumer<Boolean>() {
                    @Override
                    public void accept(@NonNull Boolean aBoolean) throws Exception {
                        if (aBoolean) {
                            List<NewsChannelBean> enableItems = adapter.getmMyChannelItems();
                            List<NewsChannelBean> disableItems = adapter.getmOtherChannelItems();
                            dao.removeAll();
                            for (int i = 0; i < enableItems.size(); i++) {
                                NewsChannelBean bean = enableItems.get(i);
                                dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_ENABLE, i);
                            }
                            for (int i = 0; i < disableItems.size(); i++) {
                                NewsChannelBean bean = disableItems.get(i);
                                dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_DISABLE, i);
                            }
                        }
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(@NonNull Boolean isRefresh) throws Exception {
                        RxBus.getInstance().post(NewsTabLayout.TAG, isRefresh);
                    }
                }, ErrorAction.error());
    }

2、doFinally
当它产生的Observable终止之后会被调用,无论是正常还 是异常终止

    /** * 加载数据 * * @param isRefresh 是否为下拉刷新(初次加载) * @param isNeedProgress 是否需要显示Progress */
    private void laodData(final boolean isRefresh, final boolean isNeedProgress) {
        if (!ContextUtils.checkNetworkConnection(getActivity())) {
            mEmptyViewHelper.setNoNetworkEmptyView(true);
            ContextUtils.showToast(getActivity(), R.string.noconnectionremind);
            return;
        }
        mEmptyViewHelper.setNoNetworkEmptyView(false);

        Observable.create(new ObservableOnSubscribe<ClientRecvObject>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<ClientRecvObject> e) throws Exception {
                User loginedUser = LoginUserManager.getLoginedUser(mConfiguration);
                int pageIndex = 1;
                if (!isRefresh) {
                    pageIndex = mPageIndex + 1;
                }
                ClientRecvObject remindClient = RemindConnector.getQARemindList(getActivity(), loginedUser, pageIndex);
                e.onNext(remindClient);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        if (null == getActivity()) {
                            disposable.dispose();
                            return;
                        }

                        if (isNeedProgress) {
                            showLoadingProgress(getString(R.string.loading), true);
                        }
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        dismissLoadingProgress();
                    }
                }).subscribe(new Observer<ClientRecvObject>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(@NonNull ClientRecvObject clientRecvObject) {
                if (getActivity() == null) {
                    disposable.dispose();
                }
                if (null != clientRecvObject && clientRecvObject.isSuccess()) {
                    if (!isRefresh) {
                        mPageIndex++;
                    }

                    RemindList remindList = (RemindList) clientRecvObject.getClientData();
                    refreshUI(isRefresh, remindList);
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }