ReactiveX 学习笔记(4)过滤数据流

时间:2024-04-27 09:20:49

Filtering Observables

本文主题为过滤 Observable 的操作符。

这里的 Observable 实质上是可观察的数据流。

RxJava操作符(三)Filtering

Debounce / Throttle

ReactiveX - Debounce operator

Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」

Debounce / Throttle 只发送源数据流中满足如下条件的数据:源数据流在发送该数据之后在指定时间段内未发送任何数据。

ReactiveX 学习笔记(4)过滤数据流

  • RxNET
var source = new Subject<int>();
source
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(i =>
Console.WriteLine("{0:HH:mm:ss.fff} {1}", DateTime.Now, i));
foreach (var i in Enumerable.Range(1, 10))
{
Console.WriteLine("{0:HH:mm:ss.fff} OnNext({1})",DateTime.Now, i);
source.OnNext(i);
Thread.Sleep(100);
}
Console.WriteLine("{0:HH:mm:ss.fff} Sleep(2000)", DateTime.Now);
Thread.Sleep(2000);
foreach (var i in Enumerable.Range(1, 5))
{
Console.WriteLine("{0:HH:mm:ss.fff} OnNext({1})", DateTime.Now, i);
source.OnNext(i);
Thread.Sleep(100);
}
Console.WriteLine("{0:HH:mm:ss.FFF} Sleep(2000)", DateTime.Now);
Thread.Sleep(2000);
/*
21:15:39.602 OnNext(1)
21:15:39.759 OnNext(2)
21:15:39.864 OnNext(3)
21:15:39.969 OnNext(4)
21:15:40.074 OnNext(5)
21:15:40.178 OnNext(6)
21:15:40.282 OnNext(7)
21:15:40.387 OnNext(8)
21:15:40.491 OnNext(9)
21:15:40.596 OnNext(10)
21:15:40.700 Sleep(2000)
21:15:41.105 10
21:15:42.706 OnNext(1)
21:15:42.809 OnNext(2)
21:15:42.911 OnNext(3)
21:15:43.011 OnNext(4)
21:15:43.113 OnNext(5)
21:15:43.217 Sleep(2000)
21:15:43.613 5
*/
  • RxJava

ReactiveX 学习笔记(4)过滤数据流

ReactiveX 学习笔记(4)过滤数据流

ReactiveX 学习笔记(4)过滤数据流

ReactiveX 学习笔记(4)过滤数据流

Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3))
.scan(0) { acc, v -> acc + 1 }
.debounce(150, TimeUnit.MILLISECONDS)
.dump()
/*
onNext: 3
onNext: 4
onNext: 5
onNext: 9
onComplete
*/
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3))
.scan(0) { acc, v -> acc + 1 }
.debounce { i -> Observable.timer(i * 50L, TimeUnit.MILLISECONDS) }
.dump()
/*
onNext: 0
onNext: 1
onNext: 3
onNext: 4
onNext: 5
onNext: 9
onComplete
*/
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
Observable.interval(100, TimeUnit.MILLISECONDS).take(3))
.scan(0) { acc, v -> acc + 1 }
.throttleWithTimeout(150, TimeUnit.MILLISECONDS)
.dump()
/*
onNext: 3
onNext: 4
onNext: 5
onNext: 9
onComplete
*/
Observable.interval(150, TimeUnit.MILLISECONDS)
.throttleFirst(1, TimeUnit.SECONDS)
.take(3)
.dump()
/*
onNext: 0
onNext: 7
onNext: 14
onComplete
*/
Observable.interval(150, TimeUnit.MILLISECONDS)
.throttleLast(1, TimeUnit.SECONDS)
.take(3)
.dump()
/*
onNext: 5
onNext: 12
onNext: 18
onComplete
*/

Distinct / DistinctUntilChanged

ReactiveX - Distinct operator

Reactive Extensions再入門 その23「重複を排除するメソッド」

Distinct 忽略源数据流的重复数据,只发送源数据流中其余各项。

DistinctUntilChanged 忽略源数据流中邻近的重复数据,只发送源数据流中其余各项。

ReactiveX 学习笔记(4)过滤数据流

ReactiveX 学习笔记(4)过滤数据流

  • RxNET
var subject = new Subject<int>();
var distinct = subject.Distinct();
subject.Subscribe(
i => Console.WriteLine("{0}", i),
() => Console.WriteLine("subject.OnCompleted()"));
distinct.Subscribe(
i => Console.WriteLine("distinct.OnNext({0})", i),
() => Console.WriteLine("distinct.OnCompleted()"));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(1);
subject.OnNext(1);
subject.OnNext(4);
subject.OnCompleted();
/*
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()
*/
var subject = new Subject<int>();
var distinct = subject.DistinctUntilChanged();
subject.Subscribe(
i => Console.WriteLine("{0}", i),
() => Console.WriteLine("subject.OnCompleted()"));
distinct.Subscribe(
i => Console.WriteLine("distinct.OnNext({0})", i),
() => Console.WriteLine("distinct.OnCompleted()"));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(1);
subject.OnNext(1);
subject.OnNext(4);
subject.OnCompleted();
/*
1
distinct.OnNext(1)
2
distinct.OnNext(2)
3
distinct.OnNext(3)
1
distinct.OnNext(1)
1
4
distinct.OnNext(4)
subject.OnCompleted()
distinct.OnCompleted()
*/
  • RxJava

ReactiveX 学习笔记(4)过滤数据流

ReactiveX 学习笔记(4)过滤数据流

val values = Observable.create<Int> { o ->
o.onNext(1)
o.onNext(1)
o.onNext(2)
o.onNext(3)
o.onNext(2)
o.onComplete()
}
values
.distinct()
.dump()
/*
onNext: 1
onNext: 2
onNext: 3
onComplete
*/
val values = Observable.create<String> { o ->
o.onNext("First")
o.onNext("Second")
o.onNext("Third")
o.onNext("Fourth")
o.onNext("Fifth")
o.onComplete()
}
values
.distinct { v -> v[0] }
.dump()
/*
onNext: First
onNext: Second
onNext: Third
onComplete
*/
val values = Observable.create<Int> { o ->
o.onNext(1)
o.onNext(1)
o.onNext(2)
o.onNext(3)
o.onNext(2)
o.onComplete()
}
values
.distinctUntilChanged()
.dump()
/*
onNext: 1
onNext: 2
onNext: 3
onNext: 2
onComplete
*/
val values = Observable.create<String> { o ->
o.onNext("First")
o.onNext("Second")
o.onNext("Third")
o.onNext("Fourth")
o.onNext("Fifth")
o.onComplete()
}
values
.distinctUntilChanged { v -> v[0] }
.dump()
/*
onNext: First
onNext: Second
onNext: Third
onNext: Fourth
onComplete
*/
  • RxSwift
let disposeBag = DisposeBag()
Observable.of("