RxJava 转换操作符 buffer

时间:2022-04-30 17:45:40

buffer操作符的功能:

1:能一次性集齐多个结果到列表中,订阅后自动清空相应结果,直到完全清除

2: 也可以周期性的集齐多个结果到列表中,订阅后自动清空相应结果,直到完全清除


例子1: 一次订阅2个


//一次订阅2个
Observable.range(1,5).buffer(2).subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
LogUtils.d("-----------------onCompleted:");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----------------->onError:");
}

@Override
public void onNext(List<Integer> strings) {
LogUtils.d("----------------->onNext:" + strings);
}
});


显示结果:

02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2]
02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4]
02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[5]
02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: -----------------onCompleted:


例子2:一次全部订阅

  //一次全部订阅
Observable.range(1,5).buffer(5).subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
LogUtils.d("-----------------onCompleted:");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----------------->onError:");
}

@Override
public void onNext(List<Integer> strings) {
LogUtils.d("----------------->onNext:" + strings);
}
});
结果:

02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5]
02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: -----------------onCompleted:


例子3:每次剔除一个

  //每次剔除1个
Observable.range(1, 5).buffer(5, 1).subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
LogUtils.d("-----------------onCompleted:");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----------------->onError:");
}

@Override
public void onNext(List<Integer> strings) {
LogUtils.d("----------------->onNext:" + strings);
}
});

02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[2, 3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: -----------------onCompleted:


注意当skip==count的时候,框架认为同一操作,一次就清除了count个元素

Observable.range(1, 5).buffer(5, 5).subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
LogUtils.d("-----------------onCompleted:");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----------------->onError:");
}

@Override
public void onNext(List<Integer> strings) {
LogUtils.d("----------------->onNext:" + strings);
}
});


02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5]
02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: -----------------onCompleted:


例子4:周期性订阅多个结果

 Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (subscriber.isUnsubscribed()) return;
while (true) {
subscriber.onNext("消息" + SystemClock.elapsedRealtime());
SystemClock.sleep(2000);//每隔2s发送消息
}

}
}).subscribeOn(Schedulers.io()).
buffer(3, TimeUnit.SECONDS).//每隔3秒 取出消息
subscribe(new Observer<List<String>>() {
@Override
public void onCompleted() {
LogUtils.d("-----------------onCompleted:");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----------------->onError:");
}

@Override
public void onNext(List<String> strings) {
LogUtils.d("----------------->onNext:" + strings);
}
});


02-20 16:55:33.283 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370507667, 消息370509668]
02-20 16:55:36.323 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370511668]
02-20 16:55:39.303 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370513669, 消息370515669]
02-20 16:55:54.883 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370529168, 消息370531172]
02-20 16:55:57.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370533184]
02-20 16:56:00.883 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370535184, 消息370537184]
02-20 16:56:03.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370539184]
02-20 16:56:06.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370541185, 消息370543204]
02-20 16:56:09.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370545204]
02-20 16:56:12.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370547204, 消息370549204]
02-20 16:56:15.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[消息370551204]