RxJava使用详解--组合操作符

时间:2022-01-15 14:40:41

RxJava使用详解系列文章

RxJava使用详解--创建操作符

RxJava使用详解--转换操作符

RxJava使用详解--过滤操作符


详细的例子可以查看文章末尾的源码

RxJava使用详解--组合操作符

这篇文章主要讲RxJava中常见的组合操作符


1.combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。下面的栗子是ob2中的每一个数据项都与ob1中的最后一项进行相加,将生成的结果组成一个新的Observable对象.

combineLatest操作符可以接受2-9个Observable作为参数,最后一个Observable中的每一个数据项,都与前面Observable中的最后一项进行规则运算。也就是call方法中的最后一个值参数是最后一个Observable的每一项数据,
前面的参数是前面每一个Observable的最后一项数据,固定不变的。
combineLatest(List,FuncN)操作符可以接受一个Observable的list集合,集合中最后一个Observable中的每一项数据,会跟前面每一个Observable对象的最后一项数据进行规则运算
默认不在任何特定的调度器上执行。

RxJava使用详解--组合操作符
Observable<Integer> ob1 = Observable.just(1,2,3);
Observable<Integer> ob2 = Observable.just(4,5,6);
Observable<Integer> ob3 = Observable.just(7,8,9);

List<Observable<Integer>> list = new ArrayList<>();
list.add(ob1);list.add(ob2);list.add(ob3);

Observable.combineLatest(ob1, ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
System.out.println("combineLatest(o1,o2,Func2):"+"o1:" + integer +" o2:"+ integer2 );
return integer + integer2;//这里进行合并的规则,可以用函数进行运算返回一个数据
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("combineLatest(o1,o2,Func2) onNext:" + integer);
}
});
输出结果:
combineLatest(o1,o2,Func2):o1:3 o2:4
combineLatest(o1,o2,Func2) onNext:7
combineLatest(o1,o2,Func2):o1:3 o2:5
combineLatest(o1,o2,Func2) onNext:8
combineLatest(o1,o2,Func2):o1:3 o2:6
combineLatest(o1,o2,Func2) onNext:9

Observable.combineLatest(ob1, ob2, ob3, new Func3<Integer, Integer, Integer, Integer>() {    @Override//这里进行合并的规则,可以用函数进行运算返回一个数据    public Integer call(Integer integer, Integer integer2, Integer integer3) {        System.out.println("combineLatest(o1,o2,o3,Func3):"+"o1:" + integer +" o2:"+ integer2 +" o3:"+ integer3);        return integer + integer2 + integer3;    }}).subscribe(new Action1<Integer>() {    @Override    public void call(Integer integer) {        System.out.println("combineLatest(o1,o2,o3,Func3) onNext:" + integer);    }});
输出结果:
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:7
combineLatest(o1,o2,o3,Func3) onNext:16
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:8
combineLatest(o1,o2,o3,Func3) onNext:17
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:9
combineLatest(o1,o2,o3,Func3) onNext:18

Observable.combineLatest(list, new FuncN<String>() {    @Override//这里进行合并的规则,可以用函数进行运算返回一个数据    public String call(Object... args) {        String concat = "";       for (Object value : args){           System.out.println("combineLatest(List,FuncN) value:" + value);           concat += value;       }        return concat;    }}).subscribe(new Action1<String>() {    @Override    public void call(String s) {        System.out.println("combineLatest(List,FuncN) onNext:" + s);    }});
输出结果:
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:7
combineLatest(List,FuncN) onNext:367
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:8
combineLatest(List,FuncN) onNext:368
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:9
combineLatest(List,FuncN) onNext:369

2.join操作符将两个Observable产生的结果合并成一个新Observable对象,join操作符可以控制每个Observable产生结果的生命周期。
参数解释 ob1.join(ob2,ob1产生结果生命周期控制函数,ob2产生结果生命周期控制函数,ob1和ob2合并结果的规则)groupJoin()操作符第四个参数与join操作符不同,详细的运行栗子查看

RxJava使用详解--组合操作符
Observable<Integer> ob1 = Observable.just(1,2);Observable<Integer> ob2 = Observable.just(3,4);//join操作符ob1.join(ob2, new Func1<Integer, Observable<Integer>>() {//ob1产生结果生命周期控制函数    @Override    public Observable<Integer> call(Integer integer) {        //使ob1延迟200毫秒执行        return Observable.just(integer).delay(200, TimeUnit.MILLISECONDS);    }}, new Func1<Integer, Observable<Integer>>() {//ob2产生结果声明周期控制函数    @Override    public Observable<Integer> call(Integer integer) {        //使ob2延迟200毫秒执行        return Observable.just(integer).delay(200, TimeUnit.MILLISECONDS);    }}, new Func2<Integer, Integer, Integer>() {//ob1 和ob2产生结果的合并规则    @Override    public Integer call(Integer integer1, Integer integer2) {        System.out.println("join(ob2,Func1,Func1,Func2) " + "integer1:" +integer1+ " integer2:" + integer2);        return integer1 + integer2;    }}).subscribe(new Action1<Integer>() {    @Override    public void call(Integer integer) {        System.out.println("join(ob2,Func1,Func1,Func2) " + integer);    }});
输出结果:
join(ob2,Func1,Func1,Func2) integer1:1 integer2:3
join(ob2,Func1,Func1,Func2) 4
join(ob2,Func1,Func1,Func2) integer1:2 integer2:3
join(ob2,Func1,Func1,Func2) 5
join(ob2,Func1,Func1,Func2) integer1:1 integer2:4
join(ob2,Func1,Func1,Func2) 5
join(ob2,Func1,Func1,Func2) integer1:2 integer2:4
join(ob2,Func1,Func1,Func2) 6

//groupJoin操作符ob1.groupJoin(ob2, new Func1<Integer, Observable<Integer>>() {//ob1产生结果生命周期控制函数    @Override    public Observable<Integer> call(Integer integer) {        //使ob1延迟1600毫秒执行        return Observable.just(integer).delay(1600, TimeUnit.MILLISECONDS);    }}, new Func1<Integer, Observable<Integer>>() {//ob2产生结果声明周期控制函数    @Override    public Observable<Integer> call(Integer integer) {        //使ob2延迟600毫秒执行        return Observable.just(integer).delay(600, TimeUnit.MILLISECONDS);    }}, new Func2<Integer, Observable<Integer>, Observable<Integer>>() {    @Override    public Observable<Integer> call(final Integer integer1, Observable<Integer> observable) {        return observable.map(new Func1<Integer, Integer>() {            @Override            public Integer call(Integer integer2) {                System.out.println("groupJoin(ob2,Func1,Func1,Func2) " + "integer1:" + integer1 + " integer2:" + integer2);                return integer1 + integer2;            }        });    }}). subscribe(new Action1<Observable<Integer>>() {    @Override    public void call(Observable<Integer> observable) {        observable.subscribe(new Action1<Integer>() {            @Override            public void call(Integer integer) {                System.out.println("groupJoin(ob2,Func1,Func1,Func2) onNnext:" + integer);            }        });    }});
输出结果:
groupJoin(ob2,Func1,Func1,Func2) integer1:1 integer2:3
groupJoin(ob2,Func1,Func1,Func2) onNnext:4
groupJoin(ob2,Func1,Func1,Func2) integer1:2 integer2:3
groupJoin(ob2,Func1,Func1,Func2) onNnext:5
groupJoin(ob2,Func1,Func1,Func2) integer1:1 integer2:4
groupJoin(ob2,Func1,Func1,Func2) onNnext:5
groupJoin(ob2,Func1,Func1,Func2) integer1:2 integer2:4
groupJoin(ob2,Func1,Func1,Func2) onNnext:6
3.merge操作符 将多个Observalbe发射的数据项,合并到一个Observable中再发射出去,可能会让合并的Observable发射的数据交错(concat是连接不会出现交错),如果在合并的途中出现错误,就会立即将错误提交给订阅者,将终止合并后的Observable
mergeDelayError操作符类似于merge操作符,唯一不同就是如果在合并途中出现错误,不会立即发射错误通知,而是保留错误直到合并后的Observable将所有的数据发射完成,
此时才会将onError提交给订阅者。
合并多个Observable也可以通过传递一个Observalbe列表List、数组。
RxJava使用详解--组合操作符

Observable<Integer> ob1 = Observable.just(1,2,3).delay(100, TimeUnit.MILLISECONDS);
Observable<Integer> ob2 = Observable.just(4,5,6)/*.delay(100, TimeUnit.MILLISECONDS)*/;

Observable.merge(ob1,ob2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("merge(ob1,ob2) onNext:" + integer);
}
});
输出结果:

12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:4
12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:5
12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:6
12-20 13:18:42.440 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:1
12-20 13:18:42.441 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:2
12-20 13:18:42.441 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:3


4.startWith操作符是在源Observable提交结果之前插入指定的数据,可以是数值,也可以是Observable对象

RxJava使用详解--组合操作符

Observable.just(1,2,3).startWith(0).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("startWith(T) onNext:" + integer);
}
});
System.out.println(" - - - - - - - - ");
Observable<Integer> ob2 = Observable.just(4,5,6);
Observable.just(1,2,3).startWith(ob2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("startWith(T) onNext:" + integer);
}
});
输出结果:

startWith(T) onNext:0
startWith(T) onNext:1
startWith(T) onNext:2
startWith(T) onNext:3
 - - - - - -  - -
startWith(T) onNext:4
startWith(T) onNext:5
startWith(T) onNext:6
startWith(T) onNext:1
startWith(T) onNext:2
startWith(T) onNext:3


5.switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者(看源码中的例子)

RxJava使用详解--组合操作符


6.zip操作符严格按照顺序进行组合Observable,假设两个Observable合并,ob1发射2个数据,ob2发射3个数据,最终合并的胡发射2个合并的数据。

zipWith操作符与上面类似,具体的看下面的例子
默认不在特定的调度器上执行

RxJava使用详解--组合操作符

Observable<Integer> ob1 = Observable.just(1,2,3);
Observable<Integer> ob2 = Observable.just(4,5,6);

Observable.zip(ob1, ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2) {
System.out.println("zip(ob1,ob2,Func2) integer1:" + integer1 +" integer2:"+integer2);
return integer1 + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("zip(ob1,ob2,Func2) onNext:" + integer);
}
});
System.out.println("- - - - - - - -");
ob1.zipWith(ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2) {
System.out.println("ob1.zipWith(ob2,Func2) integer1:" + integer1 +" integer2:"+integer2);
return integer1 + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("ob1.zipWith(ob2,Func2) " + integer );
}
});
输出结果:

zip(ob1,ob2,Func2) integer1:1 integer2:4
zip(ob1,ob2,Func2) onNext:5
zip(ob1,ob2,Func2) integer1:2 integer2:5
zip(ob1,ob2,Func2) onNext:7
zip(ob1,ob2,Func2) integer1:3 integer2:6
zip(ob1,ob2,Func2) onNext:9
- - - - - - - -
ob1.zipWith(ob2,Func2) integer1:1 integer2:4
ob1.zipWith(ob2,Func2)  5
ob1.zipWith(ob2,Func2) integer1:2 integer2:5
ob1.zipWith(ob2,Func2)  7
ob1.zipWith(ob2,Func2) integer1:3 integer2:6
ob1.zipWith(ob2,Func2)  9

更多详细内容和例子,可以查看源码