Android框架学习之RxJava(二)操作符

时间:2021-09-18 17:49:31

一.创建型操作符
1.create:用于创建Observable的操作符。

Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
//subscriber.onStart(); 这个方法在订阅后自动执行
subscriber.onNext("Hello RxAndroid");
subscriber.onCompleted();
}
});

2.from:把其他类型的对象和数据类型转化成Observable.

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable.from(items)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});

3.just:也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数不太一样。

Observable.just(1,2,3,4)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});

4.defer:直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的(不太明白)。

Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("hello rxAndroid");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("xx",s);
}
});

5.timer:给定的延时之后发射数据项为0的Observable

Observable.timer(2000,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//2000延迟两秒
Log.d("xx",aLong.toString()); // 0
}
});

6.interval:按照给定的时间间隔发射从0开始的整数序列

 Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//每隔2秒发送数据项,从0开始计数
//0,1,2,3....
}
});

7.range:发射指定范围的整数序列

Observable.range(2,5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("xx",integer.toString());// 2,3,4,5,6
//从2开始发射5个数据
}
});

8.empty:什么都不做直接执行onCompleted;
9.error:什么都不做直接调用onError;
10.never:什么都不做;

Observable.empty();
Observable.error(new Throwable());
Observable.never();

二.合并型操作符
1.concat:连接符

    Observable<Integer> just = Observable.just(1, 2);
Observable<Integer> just1 = Observable.just(3, 4, 5);
Observable.concat(just,just1)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("XX",integer.toString());
//1,2,3,4,5
}
});

2.startWith:在数据前面添加一项数据

Observable.just(1,2)
.startWith(6,7,8)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("XX",integer.toString());
//6,7,8,1,2
}
});

3.merge:按照时间线连接数据。

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(Schedulers.io());
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});

4.zip:压缩

Observable<Integer>  o1=Observable.just(1,2,3,4);
Observable<Integer> o2=Observable.just(4,5,6);


Observable.zip(o1, o2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer item1, Integer item2) {
return item1+"and"+item2;
}
})
.subscribe(item->Log.d("JG",item));
//1and4,2and5,3and6

5.combineLatest(不懂)

三.变换操作符
1.map:映射, 对输入数据进行转换, 如大写

Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
//对源Observable产生的结果,都统一乘以3处理
return integer*3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("next:" + integer);
}
});
//把Integer 转换成String
Single.just(4).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
})
.subscribe(new SingleSubscriber<String>() {
@Override
public void onSuccess(String value) {
//mValueDisplay.setText(value);
}

@Override
public void onError(Throwable error) {

}
});

2.flatMap:将数据集合装换成一个Observable对象,然后再在这个Observable对象里面继续操作数据。

Observable.just(1,2)
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(integer *100);
subscriber.onCompleted();
// 100,200
}
});
}
});

3.flatMapIterable: 和flatMap的作用一样,只不过生成的是Iterable而不是Observable

4.concatMap:类似flatMap。

5.switchMap(这三个不是太懂)

6.groupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。

 Observable.just(2,3,5,6)
.groupBy(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {//分组
return integer%2==0?"偶数":"奇数";
}
})
.subscribe(new Action1<GroupedObservable<String, Integer>>() {
@Override
public void call(GroupedObservable<String, Integer> o) {

o.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("xx",o.getKey()+":"+integer.toString()); //偶数:2,奇数:3,...
}
});
}
})

7.buffer(不懂)
8.window(不懂)

四.转换操作符
1.toList:将数据装换成list

 Observable.just(2,3,4,5)
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {

}
});

2.toSortedList:转化成有序的list

Observable.just(6,2,3,4,5)
.toSortedList(new Func2<Integer, Integer, Integer>() {//自定义排序
@Override
public Integer call(Integer integer, Integer integer2) {
return integer-integer2; //>0 升序 ,<0 降序
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.d("xx",integers.toString()); // [2, 3, 4, 5, 6]
}
});

3.toMap:将数据装换成map

 Observable.just(6,2,3,4,5)
.toMap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "key:" + integer; //根据数据项生成map的key
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "value:"+integer; //根据数据项生成map的kvalue
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> stringStringMap) {
Log.d("xx",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
}
});

4.toMultiMap:和tomap类似,只是map的value是一个集合

五.过滤操作符
1.filter:过滤

 Observable.just(3,4,5,6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
})
.subscribe(item->Log.d("xx",item.toString())); //5,6

2.ofType:过滤指定类型数据

3.take:发射开始的N项数据或者一定时间内的数据

Observable.just(1,2,3,4)
.take(2)//发射前2个数据项
.take(1000, TimeUnit.MILLISECONDS)//发射1000ms内的数据

4.takeLast:只发射最后的N项数据或者一定时间内的数据

Observable.just(1,2,3,4)
.takeLast(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//3,4
}
});

5.takeFirst:提取满足条件的第一项

6.first/firstOrDefault:只发射第一项(或者满足某个条件的第一项)数据

Observable.just(1,2,3,4)
.first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);//1
}
});

7.last/lastOrDefault:只发射最后一项(或者满足某个条件的最后一项)数据,可以指定默认值.

8.skip:跳过开始的N项数据或者一定时间内的数据

Observable.just(1,2,3)
.skip(1)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//2,3
}
});

9.skipLast:跳过最后的N项数据或者一定时间内的数据

10.elementAt/elementAtOrDefault:发射某一项数据,如果超过了范围可以的指定默认值.

11.ignoreElements:丢弃所有数据,只发射错误或正常终止的通知

12.distinct:过滤重复数据

Observable.just(1,1,2,3,4,4)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {

}

13.distinctUntilChanged:过滤掉连续重复的数据

14.throttleFirst:定期发射Observable发射的第一项数据

15.throttleWithTimeout/debounce:发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时 才进行发射

16.sample/throttleLast:定期发射Observable最近的数据

17.timeout: 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable

六.条件/布尔操作符

1.all: 判断所有的数据项是否满足某个条件
2.exists: 判断是否存在数据项满足某个条件
3.contains: 判断在发射的所有数据项中是否包含指定的数据
4.sequenceEqual: 用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
5.isEmpty: 用于判断Observable发射完毕时,有没有发射数据
6.amb: 给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他Observable将会被忽略