Rxjava操作符汇总

时间:2022-09-16 15:14:01

《RxJavaEssentials》笔记
参考博客:http://www.tuicool.com/articles/A3uY7rF

一、创建型操作符

1. just()

功能:将一个或多个对象转换成发射这个或这些对象的一个Observable。
用法:

just=  Observable.just(1,3,4).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("Just--------onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("Just-----------onError",e.getMessage());
}

@Override
public void onNext(Integer integer) {
Log.e("Just-----------OnNext","="+integer);
}
});

结果:
07-17 18:43:21.812 7734-7734/farmer.zpm.com.rxjavalearn E/Just———–OnNext: =1
07-17 18:43:21.812 7734-7734/farmer.zpm.com.rxjavalearn E/Just———–OnNext: =3
07-17 18:43:21.812 7734-7734/farmer.zpm.com.rxjavalearn E/Just———–OnNext: =4
07-17 18:43:21.812 7734-7734/farmer.zpm.com.rxjavalearn E/Just——–onCompleted: onCompleted

2.from()

功能:将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
用法:

 List<String> list=new ArrayList<String>();
list.add("on");
list.add("one");
list.add("two");
list.add("owe");
Observable.from(list).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e("from----onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("from---onError",e.getMessage());
}

@Override
public void onNext(String s) {
Log.i("from----OnNext","="+s);
}
});

结果:
07-17 18:46:20.027 10313-10313/farmer.zpm.com.rxjavalearn I/from—-OnNext: =on
07-17 18:46:20.027 10313-10313/farmer.zpm.com.rxjavalearn I/from—-OnNext: =one
07-17 18:46:20.027 10313-10313/farmer.zpm.com.rxjavalearn I/from—-OnNext: =two
07-17 18:46:20.027 10313-10313/farmer.zpm.com.rxjavalearn I/from—-OnNext: =owe
07-17 18:46:20.027 10313-10313/farmer.zpm.com.rxjavalearn E/from—-onCompleted: onCompleted

3.repeat()

功能:创建一个重复发射指定数据或数据序列的Observable。
用法:

 repeat=  Observable.just(1,3,4)
.repeat(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("repeat------onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("repeat---------onError",e.getMessage());
}

@Override
public void onNext(Integer integer) {
Log.e("repeat---------OnNext","="+integer);
}
});

结果:
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =1
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =3
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =4
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =1
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =3
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat———OnNext: =4
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/repeat——onCompleted: onCompleted

4.defer()

功能:只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable。
用法:

defer= Observable.defer(() -> Observable.just("defer"))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e("defer------onCompleted","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("defer---------onError",e.getMessage());
}
@Override
public void onNext(String s) {
Log.e("defer---------OnNext","="+s);
}
})
;

结果:
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/defer———OnNext: =defer
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/defer——onCompleted: onCompleted

5.range()

功能:创建一个发射指定范围的整数序列的Observable。
用法:

 range= Observable.range(10,3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("range-------onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("range----------onError",e.getMessage());
}

@Override
public void onNext(Integer integer) {
Log.e("range----------OnNext","="+integer);
}
});

结果:
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/range———-OnNext: =10
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/range———-OnNext: =11
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/range———-OnNext: =12
07-17 18:50:00.275 13456-13456/farmer.zpm.com.rxjavalearn E/range——-onCompleted: onCompleted

6.interval()

功能: 创建一个按照给定的时间间隔发射整数序列的Observable
用法:
每隔3s发送一次

interval=  Observable.interval(3, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
Log.e("interval----onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("interval---onError",e.getMessage());
}

@Override
public void onNext(Long aLong) {
Log.e("interval----OnNext","="+aLong);
}
});

结果:
07-17 18:51:52.671 13590-13613/farmer.zpm.com.rxjavalearn E/interval—-OnNext: =0
07-17 18:51:55.670 13590-13613/farmer.zpm.com.rxjavalearn E/interval—-OnNext: =1
07-17 18:51:58.670 13590-13613/farmer.zpm.com.rxjavalearn E/interval—-OnNext: =2
07-17 18:52:01.670 13590-13613/farmer.zpm.com.rxjavalearn E/interval—-OnNext: =3

7.timer()

功能:创建一个在给定的延时之后发射单个数据的Observable
用法:

timer_2= Observable.timer(3, TimeUnit.SECONDS)//每隔3s
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.e("timer----onCompleted","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("timer---onError",e.getMessage());
}
@Override
public void onNext(Long number) {
Log.e("timer----OnNext","="+number);
}
});

结果:
07-17 18:55:58.756 17166-17185/farmer.zpm.com.rxjavalearn E/timer—-OnNext: =0
07-17 18:55:58.756 17166-17185/farmer.zpm.com.rxjavalearn E/timer—-onCompleted: onCompleted

8.empty( )

功能:创建一个什么都不做直接通知完成的Observable

9.error( )

功能:创建一个什么都不做直接通知错误的Observable

10.never( )

功能: 创建一个不发射任何数据的Observable

二、过滤操作符

1.filter( )

功能: 过滤数据。
用法:
该例子打印以“o”开头,“e”结尾的字符串。

List<String> list=new ArrayList<String>();
list.add("on");
list.add("one");
list.add("two");
list.add("owe");
filter= Observable.from(list)
.filter(s -> s.startsWith("o"))
.filter(s->s.endsWith("e"))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e("filter----onCompleted","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("filter---onError",e.getMessage());
}

@Override
public void onNext(String s) {
Log.i("filter----OnNext","="+s);
}
});

结果:
07-17 18:57:35.067 18518-18518/farmer.zpm.com.rxjavalearn I/filter—-OnNext: =one
07-17 18:57:35.067 18518-18518/farmer.zpm.com.rxjavalearn I/filter—-OnNext: =owe
07-17 18:57:35.067 18518-18518/farmer.zpm.com.rxjavalearn E/filter—-onCompleted: onCompleted

2.take( ) takeLast()

功能: take( )只发射开始的N项数据,takeLast()只发射最后的N项数据。
Rxjava操作符汇总
Rxjava操作符汇总
用法:

  Observable.just(1,2,3,4,5,6,7,8,9,10)
.take(4)
.takeLast(2)
.subscribe(integer -> { Log.e("take------","="+integer); });

结果:
07-17 18:57:35.069 18518-18518/farmer.zpm.com.rxjavalearn E/take——: =3
07-17 18:57:35.069 18518-18518/farmer.zpm.com.rxjavalearn E/take——: =4

3.distinct( ) distinctUntilChanged()

功能: distinct( )过滤掉重复数据;distinctUntilChanged()过滤掉连续重复的数据。
Rxjava操作符汇总
用法:

 Observable.just(1,2,1,1,3,4,5).distinct().subscribe(integer -> {
Log.e("distinct---","="+integer);
});
Observable.just(1,2,1,1,3,4,5).distinctUntilChanged().subscribe(integer -> {
Log.e("distinctUntilChanged--","="+integer);
});

结果:
07-17 18:57:35.069 18518-18518/farmer.zpm.com.rxjavalearn E/distinct—: =1
07-17 18:57:35.069 18518-18518/farmer.zpm.com.rxjavalearn E/distinct—: =2
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinct—: =3
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinct—: =4
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinct—: =5
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =1
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =2
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =1
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =3
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =4
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/distinctUntilChanged–: =5

4.first() last()

功能:first()只发射第一项数据,last()只发射最后的一项数据。
Rxjava操作符汇总
Rxjava操作符汇总
用法:

        Observable.just(1,2,3,4,5,6,7).first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("first","="+integer);
}
});

结果:
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/first——: =1

5.skip() skipLast()

功能:skip( ) 跳过开始的N项数据,skipLast( ) — 跳过最后的N项数据。
Rxjava操作符汇总
Rxjava操作符汇总
用法:

 Observable.just(1,2,3,4,5,6,7).skip(3)
.subscribe(integer -> {
Log.e("skip------","="+integer);
});

结果:
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/skip——: =4
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/skip——: =5
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/skip——: =6
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/skip——: =7

6.elementAt( ) elementAtOrDefault( )

功能:elementAt( ) 发射第N项数据,elementAtOrDefault( )发射第N项数据,如果Observable数据少于N项就发射默认值。
Rxjava操作符汇总
用法:

Observable.just(1,2,3,4,5,6,7).elementAt(3)
.subscribe(integer -> {
Log.e("skip------","="+integer);
});

结果:
07-17 18:57:35.070 18518-18518/farmer.zpm.com.rxjavalearn E/elementAt——: =4

7.sample( ) or throttleLast( )

功能: 定期发射Observable最近的数据。
Rxjava操作符汇总
用法:

        Observable.interval(4,TimeUnit.SECONDS).sample(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long along) {
Log.e("sample------","="+along);
}
});

结果:
07-17 19:02:03.684 22453-22478/farmer.zpm.com.rxjavalearn E/sample——: =0
07-17 19:02:07.685 22453-22478/farmer.zpm.com.rxjavalearn E/sample——: =1
07-17 19:02:11.684 22453-22478/farmer.zpm.com.rxjavalearn E/sample——: =2
07-17 19:02:15.684 22453-22478/farmer.zpm.com.rxjavalearn E/sample——: =3

8.throttleFirst( )

功能:定期发射Observable发射的第一项数据

9.throttleWithTimeout( ) or debounce( )

功能:只有当Observable在指定的时间后还没有发射数据时,才发射一个数据。
用法:

  Observable.interval(5,TimeUnit.SECONDS).debounce(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("debounce-----","="+aLong);
}
});

结果:
07-17 19:05:39.310 25326-25432/farmer.zpm.com.rxjavalearn E/debounce—–: =0
07-17 19:05:44.309 25326-25432/farmer.zpm.com.rxjavalearn E/debounce—–: =1
07-17 19:05:49.310 25326-25432/farmer.zpm.com.rxjavalearn E/debounce—–: =2
07-17 19:05:54.309 25326-25432/farmer.zpm.com.rxjavalearn E/debounce—–: =3

10.timeout( )

功能:如果在一个指定的时间段后还没发射数据,就发射一个异常
Rxjava操作符汇总
用法:

 Observable.interval(5, TimeUnit.SECONDS).timeout(3,TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
Log.e("timeout----","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.e("timeout----","onError");
}

@Override
public void onNext(Long aLong) {
Log.e("timeout----","="+aLong);
}
});

结果:
07-17 19:09:43.433 27390-27421/farmer.zpm.com.rxjavalearn E/timeout—-: onError

三、变换操作符

1.map()

功能:对序列的每一项都应用一个函数来变换Observable发射的数据序列。
Rxjava操作符汇总
用法:

 Observable.just(1,2,3,4)
.map(integer -> integer*10)
.subscribe(integer -> {Log.i("map------","="+integer);});

结果:
07-17 19:24:58.783 9557-9557/farmer.zpm.com.rxjavalearn I/map——: =10
07-17 19:24:58.783 9557-9557/farmer.zpm.com.rxjavalearn I/map——: =20
07-17 19:24:58.783 9557-9557/farmer.zpm.com.rxjavalearn I/map——: =30
07-17 19:24:58.783 9557-9557/farmer.zpm.com.rxjavalearn I/map——: =40

2.FlatMap()

功能:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。
Rxjava操作符汇总
用法:
http://blog.csdn.net/zhupumao/article/details/51893067
结果:
http://blog.csdn.net/zhupumao/article/details/51893067

3.scan()

功能:scan可以认为具有累加器的功能,可以将前一个的值传递给后面使用。
Rxjava操作符汇总
用法:

 Observable.just(1,2,3,4)
.scan((integer, integer2) -> integer+integer2)
.subscribe(integer -> {Log.e("scan----","="+integer);});

结果:
07-17 19:24:58.784 9557-9557/farmer.zpm.com.rxjavalearn E/scan—-: =1
07-17 19:24:58.784 9557-9557/farmer.zpm.com.rxjavalearn E/scan—-: =3
07-17 19:24:58.784 9557-9557/farmer.zpm.com.rxjavalearn E/scan—-: =6
07-17 19:24:58.784 9557-9557/farmer.zpm.com.rxjavalearn E/scan—-: =10

4.GroupBy()

功能:将原来的Observable转变为一个新的可以发射Observables的Observable,每一个Observable可以发射一组不同的数据。
Rxjava操作符汇总
用法:

Observable.just(1,2,3,4,5,6,7,8,9)
.groupBy(integer -> integer%3)
.subscribe(result -> { result.subscribe(integer -> {
Log.e("groupBy","key="+result.getKey()+"--value"+integer);
});
});

结果:
07-17 19:57:08.079 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=1–value1
07-17 19:57:08.079 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=2–value2
07-17 19:57:08.079 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=0–value3
07-17 19:57:08.079 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=1–value4
07-17 19:57:08.080 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=2–value5
07-17 19:57:08.080 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=0–value6
07-17 19:57:08.080 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=1–value7
07-17 19:57:08.080 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=2–value8
07-17 19:57:08.080 6995-6995/farmer.zpm.com.rxjavalearn E/groupBy: key=0–value9

5.buffer( )

功能: 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个。
Rxjava操作符汇总
Rxjava操作符汇总
用法:

 Observable.just(1,2,3,4,5,6,7,8,9).buffer(4).subscribe(integers -> {
Log.e("buffer---","size="+integers.size());
for (int i:integers ) {
Log.e("buffer---","i="+i);
}
});

结果:
07-17 20:09:22.640 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: size=4
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=1
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=2
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=3
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=4
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: size=4
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=5
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=6
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=7
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=8
07-17 20:09:22.641 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: size=1
07-17 20:09:22.642 18252-18252/farmer.zpm.com.rxjavalearn E/buffer—: i=9
6.window( )
功能:和buffer()功能类似,只不过window发射的是Observables,buffer()发射的是List。
Rxjava操作符汇总
7.cast()
功能:在发射之前强制将Observable发射的所有数据转换为指定类型。是map的特殊用法。
Rxjava操作符汇总

四、结合型操作符

1.Merge

功能:将多个Observable合并为一个。
Rxjava操作符汇总
用法:

 Observable.interval(2,TimeUnit.SECONDS)
.mergeWith(Observable.interval(1,TimeUnit.SECONDS))
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("mergeWith---","="+aLong);
}
});

结果;

07-17 20:31:29.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =0
07-17 20:31:30.730 32130-32163/farmer.zpm.com.rxjavalearn E/mergeWith—: =0
07-17 20:31:30.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =1
07-17 20:31:31.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =2
07-17 20:31:32.729 32130-32163/farmer.zpm.com.rxjavalearn E/mergeWith—: =1
07-17 20:31:32.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =3
07-17 20:31:33.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =4
07-17 20:31:34.729 32130-32163/farmer.zpm.com.rxjavalearn E/mergeWith—: =2
07-17 20:31:34.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =5
07-17 20:31:35.730 32130-32164/farmer.zpm.com.rxjavalearn E/mergeWith—: =6

2.Zip

功能:使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。
Rxjava操作符汇总
用法:

        Observable.interval(2, TimeUnit.SECONDS)
.zipWith(Observable.just("one", "two"), new Func2<Long, String, String>() {
@Override
public String call(Long aLong, String s) {
return aLong+"="+s;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("zipWith---",s);
}
});

结果://只有两行
07-17 20:42:53.592 10788-10819/farmer.zpm.com.rxjavalearn E/zipWith—: 0=one
07-17 20:42:55.593 10788-10819/farmer.zpm.com.rxjavalearn E/zipWith—: 1=two
以后有空继续更新!!!!!!