RxJava----操作符:转换操作符

时间:2022-02-20 17:45:23

Transforming Observables(转换操作符)

本节介绍转换数据流中数据的方法。在真实世界中, Observable中的数据可以是任意类型的,可能在你的应用中无法直接使用这些数据类型,你需要对这些数据对象进行一些转换。

  • map 和 flatMap 是本节中操作函数的基础。 下面是三种转换方式的示意:
    • Ana(morphism) T –> IObservable
    • Cata(morphism) IObservable –> T
    • Bind IObservable –> IObservable

map

最基础的转换函数就是 map。 map 使用一个转换的参数把源Observable 中的数据转换为另外一种类型的数据。返回的
Observable 中包含了转换后的数据。

public final <R> Observable<R> map(Func1<? super T,? extends R> func)

RxJava----操作符:转换操作符

        Observable<Integer> values = Observable.just("0", "1", "2", "3")
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
});
values.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer+"");
}
});

结果:

0
1
2
3

源 Observable 发射的为 String 类型数据,而我们需要的是 int 类型,则可以通过 map 把 String 转换为 int。

如果你认为这种转换太简单了, 完全可以在 Subscriber 中完成,这样在设计架构上并不合理,没有有效的区分职责。
代码设计每个部分都有各自的职责,使用 map 可以有效的确保职责清晰。方便后续修改。

cast

cast 是把一个对象强制转换为子类型的缩写形式。 假设源 Observable为 Observable
RxJava----操作符:转换操作符

         Observable<String> values = Observable.just("0","1","2","3");
values
.cast(Object.class)
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
log(object.toString()+":"+object.getClass());
}
});

结果:

0:class java.lang.String
1:class java.lang.String
2:class java.lang.String
3:class java.lang.String

如果遇到类型不一样的对象的话,就会抛出一个 error。

如果你不想处理类型不一样的对象,则可以用 ofType 。 该函数用来判断数据是否为 该类型,如果不是则跳过这个数据。

flatMap

map 把一个数据转换为另外一个数据。而 flatMap 把源 Observable 中的一个数据替换为任意数量的数据,可以为 0 个,也可以为无限个。 flatMap 把源 Observable 中的一个数据转换为一个新的 Observable 发射出去。

public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)

RxJava----操作符:转换操作符
flatMap 的参数会把 源 Observable 中发射的每个数据转换为一个新的 Observable, 然后 flatMap 再把这些新的 Observable 中发射的数据发射出来。每个新的 Observable 数据都是按照他们产生的顺序发射出来,但是 Observable 之间数据的顺序可能会不一样。
下面通过一个简单的例子来帮助理解 flatMap 。

        Observable<Integer> values = Observable.range(1,3);
values
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.range(0,integer);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});;

这里 values 会发射 1 、 2、 3 三个数据。 然后 flatMap 把每个数据变为新的 Observable(Observable.range(0,i)),所以会有 3 个 Observable,这 3个 Observable 分别发射 [0],[0,1] 和 [0,1,2]。最终 flatMap 再把这 3 个新 Observable 发射的数据合并到一个 Observable发射出去。

所以上面的结果如下:
结果:

0
0
1
0
1
2

再看一个示例,把 int 值转换为 字母:

        Observable<Integer> values2 = Observable.just(1);
values2
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.just(Character.valueOf((char)(integer+64)));
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

结果:

A

上面的示例,用 map 函数实现会更简单,这里是为了说明 flatMap 另外一种功能,如果你发现源 Observable 中发射的数据不符合你的要求,则你可以返回一个 空的 Observable。这就相当于过滤数据的作用, 例如:

        Observable<Integer> values = Observable.range(0,30);
values
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
if (0 < integer && integer <= 26)
return Observable.just(Character.valueOf((char)(integer+64)));
else
return Observable.empty();
}
})
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log( e.getMessage().toString());
}
@Override
public void onNext(Object o) {
log(o.toString());
}
});

结果:

A
B
C
...
X
Y
Z
Complete!

上面示例源 Observable 一共发射 0 到 29 这 30个数字。在 flatMap 中判断 如果数字大于 0 并且小于等于26,则转换为字母用 并用 Observable.just 生成新的 Observable;其他数字都返回一个Observable.empty() 空 Observable。

注意,flatMap 是把几个新的 Observable 合并为一个 Observable 返回, 只要这些新的 Observable有数据发射出来, flatMap 就会把数据立刻发射出去。所以如果这些新的 Observable 发射数据是异步的,那么 flatMap返回的数据也是异步的。

下面示例中使用 Observable.interval 来生成每个数据对应的新 Observable,由于 interval 返回的Observable 是异步的,所以可以看到最终输出的结果是每当有 Observable 发射数据的时候, flatMap 就返回该数据。

         Observable.just(100, 150)
.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(final Integer integer) {
return Observable.interval(integer,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return integer;
}
});
}
})
.take(6)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

上面的 new Func1<Integer, Observable<?>>()先把参数integer(这里分别为 100 和 150 这两个数字)转换为 Observable.interval(integer, TimeUnit.MILLISECONDS), 每隔 integer 毫秒发射一个数字,这样两个 Observable.interval 都发射同样的数字,只不过发射的时间间隔不一样,所以为了区分打印的结果,我们再用 map(new Func1<Long, Object>() 把结果转换为 integer 。
结果:

100
150
100
100
150
100

可以两个新的 Observable 的数据交织在一起发射出来。

concatMap

  • 如果你不想把新 Observable 中的数据交织在一起发射,则可以选择使用 concatMap 函数。
  • 该函数会等第一个新的 Observable 完成后再发射下一个 新的 Observable 中的数据。
        Observable.just(100, 150)
.concatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(final Integer integer) {
return Observable.interval(integer, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return integer;
}
})
.take(3);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

结果:

100
100
100
150
150
150

所以 concatMap 要求新的Observable 不能是无限的,否则该无限 Observable 会阻碍后面的数据发射。为此,上面的示例使用 take 来结束 Observable。

switchMap

switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!流程图如下:
RxJava----操作符:转换操作符
以下是flatMap、concatMap和switchMap的运行实例对比:

         //flatMap操作符的运行结果
Observable.just(10, 22, 34).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;

return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("flatMap Next:" + integer);
}
});

//concatMap操作符的运行结果
Observable.just(10, 22, 34).concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;

return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("concatMap Next:" + integer);
}
});

//switchMap操作符的运行结果
Observable.just(10, 22, 34).switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//10的延迟执行时间为200毫秒、22和34的延迟执行时间为180毫秒
int delay = 200;
if (integer > 10)
delay = 180;

return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("switchMap Next:" + integer);
}
});

结果:

flatMap Next:22
flatMap Next:34
flatMap Next:17
flatMap Next:11
flatMap Next:10
flatMap Next:5

switchMap Next:34
switchMap Next:17

concatMap Next:10
concatMap Next:5
concatMap Next:22
concatMap Next:11
concatMap Next:34
concatMap Next:17

这里from返回的是一个Observable,所以【10,5】、【22,11】、【34,17】都是一个整体。switchMap直接去最后一个,也就是【34,17】,。flatMap在180毫秒之后是【22,11】和【34,17】,200毫秒之后是【10,5】。concatMap就是一个接着一个,按顺序来了。

flatMapIterable

flatMapIterable 和 flatMap 类似,区别是 flatMap 参数把每个数据转换为 一个新的 Observable,而 flatMapIterable 参数把一个数据转换为一个新的 iterable 对象。
例如下面是一个把参数转换为 iterable 的函数:

public static Iterable<Integer> toList(int start, int count) {
List<Integer> list = new ArrayList<>();
for (int i=start ; i<start+count ; i++) {
list.add(i);
}
return list;
}

然后可以这样使用该函数作为 flatMapIterable 的参数:

Observable.range(1, 3)
.flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
return toList(1,integer);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

结果:

1
1
2
1
2
3

flatMapIterable 把生成的 3 个 iterable 合并为一个 Observable 发射。

作为 Rx 开发者,我们需要知道在 Rx 中应该使用 Observable 数据流来发射数据而不要混合使用传统的iterable。但是如果你无法控制数据的来源,提供数据的一方只提供 iterable数据,则依然可以直接使用这些数据。flatMapIterable 把多个 iterable 的数据按照顺序发射出来,不会交织发射。

flatMapIterable 还有另外一个重载函数可以用源 Observable 发射的数据来处理新的 iterable 中的每个数据:

        Observable.range(1, 3)
.flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
return toList(1,integer);
}
}, new Func2<Integer, Object, Object>() {
@Override
public Object call(Integer integer, Object o) {
return integer * (Integer) o;
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

结果:

1
2
4
3
6
9

注意,上面的 Func2()call(Integer integer, Object o)中integer参数取值为 源 Observable 发射出来的数据,也就是 1、 2、 3. 而 o参数取值为toList(1,integer) 参数生成的 iterable 中的每个数据,也就是分别为 [1]、[1,2]、[1,2,3],所以最终的结果就是:[11], [12, 22], [13, 23, 33].

buffer

buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。

需要注意的是,一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。

buffer的名字很怪,但是原理很简单,流程图如下:
RxJava----操作符:转换操作符

        final int[] items = new int[]{1, 3, 5, 7, 9};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
if (subscriber.isUnsubscribed()) return;
Random random = new Random();
while (true) {
int i = items[random.nextInt(items.length)];
subscriber.onNext(i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
observable.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});

结果:

[3, 1, 7]
[5, 9, 7]
[1, 5, 3]
[7, 1, 5]
...

buffer有很多重载的方法:

public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) 
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler)
public final <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector)
public final <B> Observable<List<T>> buffer(Observable<B> boundary)
public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity)

window

window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

window操作符有很多个重载方法,这里只举一个简单的例子,其流程图如下:
RxJava----操作符:转换操作符

 Observable.interval(1, TimeUnit.SECONDS).take(12)
.window(3, TimeUnit.SECONDS)
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> observable) {
log("subdivide begin......");
observable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Next:" + aLong);
}
});
}
});

结果:

subdivide begin…… 
Next:0
Next:1
subdivide begin……
Next:2
Next:3
Next:4
subdivide begin……
Next:5
Next:6
Next:7
subdivide begin……
Next:8
Next:9
Next:10
subdivide begin……
Next:11

scan

scan 和 reduce 很像,不一样的地方在于 scan会发射所有中间的结算结果。

public final Observable<T> scan(Func2<T,T,T> accumulator)

RxJava----操作符:转换操作符

通过上图可以看到和 reduce 的区别, reduce 只是最后把计算结果发射出来,而 scan 把每次的计算结果都发射出来。

        Observable<Integer> values = Observable.range(0, 5);
values.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
})
// .takeLast()//实现reduce
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Sum:Complete!");
}
@Override
public void onError(Throwable e) {
log("Sum:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Sum:" + integer);
}
});

结果:

Sum: 0
Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: Complete!
reduce 可以通过 scan 来实现: reduce(acc) = scan(acc).takeLast() 。所以 scan 比 reduce 更加通用。
  • 源 Observable 发射数据,经过 scan 处理后 scan 也发射一个处理后的数据,
  • 所以 scan 并不要求源 Observable 完成发射。
  • 下面示例实现了 查找已经发射数据中的最小值的功能:
        Subject<Integer, Integer> values = ReplaySubject.create();
values
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Values:Complete!");
}
@Override
public void onError(Throwable e) {
log("Values:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Values:" + integer);
}
});
values
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return (integer<integer2) ? integer : integer2;
}
})
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Min:Complete!");
}
@Override
public void onError(Throwable e) {
log("Min:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Min:" + integer);
}
});
values.onNext(2);
values.onNext(3);
values.onNext(1);
values.onNext(4);
values.onCompleted();

结果:


Values: 2
Min: 2
Values: 3
Values: 1
Min: 1
Values: 4
Values: Completed
Min: Completed

groupBy

groupBy 是 toMultimap 函数的 Rx 方式的实现。groupBy 根据每个源Observable 发射的值来计算一个
key, 然后为每个 key 创建一个新的 Observable并把key 一样的值发射到对应的新 Observable 中。

public final <K> Observable<GroupedObservable<K,T>> groupBy(Func1<? super T,? extends K> keySelector)

RxJava----操作符:转换操作符

  • 返回的结果为 GroupedObservable。 每次发现一个新的key,内部就生成一个新的 GroupedObservable并发射出来。和普通的 Observable 相比 多了一个 getKey 函数来获取 分组的 key。来自于源Observable中的值会被发射到对应 key 的 GroupedObservable 中。
  • 嵌套的 Observable 导致方法的定义比较复杂,但是提供了随时发射数据的优势,没必要等源Observable 发射完成了才能返回数据。
  • 下面的示例中使用了一堆单词作为源Observable的数据,然后根据每个单词的首字母作为分组的 key,最后把每个分组的 最后一个单词打印出来:
         Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.subscribe(new Action1<GroupedObservable<Object, String>>() {
@Override
public void call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
objectStringGroupedObservable.last().subscribe(new Action1<String>() {
@Override
public void call(String s) {
log( objectStringGroupedObservable.getKey() +":" + s);
}
});
}
});
t:third
f:fifth
s:sixth

上面的代码使用了嵌套的 Subscriber,但Rx 功能之一就是为了避免嵌套回调函数,所以下面演示了如何避免嵌套:

         Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
})
.flatMap(new Func1<GroupedObservable<Object, String>, Observable<?>>() {
@Override
public Observable<?> call(final GroupedObservable<Object, String> objectStringGroupedObservable) {
return objectStringGroupedObservable.last().map(new Func1<String, Object>() {
@Override
public Object call(String s) {
return objectStringGroupedObservable.getKey() +":" + s;
}
});
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});

结果:

s: sixth
t: third
f: fifth

项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 转换数据流 - 云在千峰
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET