[Android开发] RxJava2之路七 - 错误处理操作符例子Demo

时间:2022-05-20 17:48:01

一、错误处理操作符列表

用于对Observable发射的 onError 通知做出响应或者从错误中恢复,例如,你
可以:

  • 吞掉这个错误,切换到一个备用的Observable继续发射数据
  • 吞掉这个错误然后发射默认值
  • 吞掉这个错误并立即尝试重启这个Observable
  • 吞掉这个错误,在一些回退间隔后重启这个Observable
名称 解析
onErrorResumeNext() 指示Observable在遇到错误时发射一个数据序列
onErrorReturn() 让Observable遇到错误时发射一个特殊的项并且正常终止。方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。
onExceptionResumeNext() 指示Observable遇到错误时继续发射数据
retry() 指示Observable遇到错误时重试
retryWhen() 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable
retryUntil() 指示Observable遇到错误时,是否让Observable重新订阅

二、错误处理操作符

2.1 onErrorReturn操作符

让Observable遇到错误时发射一个特殊的项并且正常终止,onErrorRetrun能够捕获在它之前发生的异常,它之后流中的操作发生的异常就它就不会管了。

    public void testOnErrorReturn() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
e.onError(new Throwable("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}

e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "在onErrorReturn处理了: "+throwable.toString() );
//拦截到错误之后,返回一个结果发射,然后就正常结束了。
return "1";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}

上面的代码输出结果为 :

02-20 19:18:40.761 13445-13461/? E/ErrorActivity: 收到消息: 0
02-20 19:18:41.762 13445-13461/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 1
02-20 19:18:42.763 13445-13461/cn.com.minstone.rxjavalearn E/ErrorActivity: 在onErrorReturn处理了: java.lang.Throwable: 出现错误了
02-20 19:18:42.763 13445-13461/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 1

2.2 onErrorResumeNext操作符

和onErrorNext不同的是,onErrorResumeNext是返回一个重新定义的Observable,onErrorNext返回的是发射的数据格式。

ps: 注意onErrorResumeNext拦截的错误是Throwable,不能拦截Exception。 不然它会将错误传递给观察者的onError方法。要拦截Exception请用onExceptionResumeNext

来个栗子:

    public void testOnErrorResumeReturn() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
//这里是Throwable
e.onError(new Throwable("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}

e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> apply(@NonNull Throwable throwable) throws Exception {
//拦截到错误之后,重新定义了被观察者
return Observable.just("重新定义了被观察者");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}

输出为:

02-21 09:11:08.691 25682-25697/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 0
02-21 09:11:09.692 25682-25697/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 1
02-21 09:11:10.693 25682-25697/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 重新定义了被观察者

2.3 onExceptionResumeNext操作符

onExceptionResumeNext 和 onErrorResumeNext基本一样,也是收到错误重新定义了新的被观察者。但是有一点不用: 如果onErrorResumeNext收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,onExceptionResumeNext则会继续拦截

ps: 注意onExceptionResumeNext拦截的错误是Exception,不能拦截Throwable。 不然它会将错误传递给观察者的onError方法。要拦截Throwable请用onErrorResumeNext

    public void testOnExceptionResumeReturn() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
//注意这里是Exception
e.onError(new Exception("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}

e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.onExceptionResumeNext(new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("错误替换的消息");
observer.onComplete();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}

输出的结果是 :

02-21 09:29:25.631 27412-27480/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 0
02-21 09:29:26.632 27412-27480/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 1
02-21 09:29:27.633 27412-27480/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 错误替换的消息

2.4 retry操作符

重试的意思,拦截到错误,然后让 被观察者重新发射数据。Throwable和Exception都额可以拦截

它有五种参数方法:
- retry(): 让被观察者重新发射数据,要是一直错误就一直发送了
- retry(BiPredicate): interger是第几次重新发送,Throwable是错误的内容
- retry(long time): 最多让被观察者重新发射数据多少次
- retry(long time,Predicate predicate): 最多让被观察者重新发射数据多少次,在predicate里面进行判断拦截 返回是否继续
- retry(Predicate predicate): 在predicate里面进行判断拦截 返回是否继续

    public void testRetry(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for(int i = 0; i<= 3 ;i++){
if(i == 2){
e.onError(new Exception("出现错误了"));
}else{
e.onNext(i+"");
}
try{
Thread.sleep(1000);
}catch (Exception ex){
ex.printStackTrace();
}
}

e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
/*.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "retry错误: "+throwable.toString());

//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})*/

/*.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
Log.e(TAG, "retry错误: "+integer+" "+throwable.toString());

//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})*/

// .retry(3) //最多让被观察者重新发射数据3次
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "retry错误: "+throwable.toString());
//最多让被观察者重新发射数据3次,但是这里返回值可以进行处理
//返回假就是不让重新发射数据了,调用观察者的onError就终止了。
//返回真就是让被观察者重新发射请求
return true;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});
}

2.5 retryWhen操作符

retryWhen和retry类似,区别是: retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable, retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。
如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

ps: 这里如果里面的throwableObservable不进行处理,那么onNext也会拦截处理,这里有个坑。

栗子:

    public void testRetryWhen() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i <= 3; i++) {
if (i == 2) {
e.onError(new Exception("出现错误了"));
} else {
e.onNext(i + "");
}
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
}

e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {

//这里可以发送新的被观察者 Observable
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

//如果发射的onError就终止
return Observable.error(new Throwable("retryWhen终止啦"));
}
});

}
})

.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "收到消息: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "结果错误: " + throwable.toString());
}
});

}

输出结果为:

02-21 11:12:17.572 6471-6608/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 0
02-21 11:12:18.573 6471-6608/cn.com.minstone.rxjavalearn E/ErrorActivity: 收到消息: 1
02-21 11:12:19.574 6471-6608/cn.com.minstone.rxjavalearn E/ErrorActivity: 结果错误: java.lang.Throwable: retryWhen终止啦

2.5 retryUntil操作符

Rxjava2才有的操作符。
很简单,作用和retry(Predicate)基本一致,返回真就是不重新订阅了。