RxJava开发精要6 - 组合Observables

时间:2023-05-15 17:12:02

上一章中。我们学到怎样转换可观測序列。我们也看到了map(),scan(),groupBY(),以及很多其它实用的函数的实际样例,它们帮助我们操作Observable来创建我们想要的Observable。

本章中,我们将研究组合函数并学习怎样同一时候处理多个Observables来创建我们想要的Observable。

Merge

在异步的世界常常会创建这种场景。我们有多个来源可是仅仅想有一个结果:多输入,单输出。RxJava的merge()方法将帮助你把两个甚至很多其它的Observables合并到他们发射的数据里。下图给出了把两个序列合并在一个终于发射的Observable。

RxJava开发精要6 - 组合Observables

正如你看到的那样,发射的数据被交叉合并到一个Observable里面。

注意假设你同步的合并Observable,它们将连接在一起而且不会交叉。

像通常一样。我们用我们的App和已安装的App列表来创建了一个“真实世界”的样例。我们还须要第二个Observable。我们能够创建一个单独的应用列表然后逆序。当然没有实际的意义,仅仅是为了这个样例。第二个列表,我们的loadList()函数像以下这样:

private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
List reversedApps = Lists.reverse(apps);
Observable<AppInfo> observableApps =Observable.from(apps);
Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps); mergedObserbable.subscribe(new Observer<AppInfo>(){
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
} @Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
} @Override
public void onNext(AppInfoappInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}

我们创建了Observable和observableApps数据以及新的observableReversedApps逆序列表。使用Observable.merge(),我们能够创建新的ObservableMergedObservable在单个可观測序列中发射源Observables发出的全部数据。

正如你能看到的,每一个方法签名都是一样的,因此我们的观察者无需在意不论什么不同就能够复用代码。结果例如以下:

RxJava开发精要6 - 组合Observables

注意错误时的toast消息。你能够觉得每一个Observable抛出的错误将会打断合并。假设你须要避免这种情况。RxJava提供了mergeDelayError(),它能从一个Observable中继续发射数据即便是当中有一个抛出了错误。当全部的Observables都完毕时,mergeDelayError()将会发射onError()。例如以下图所看到的:

RxJava开发精要6 - 组合Observables

ZIP

我们在处理多源时可能会带来这样一种场景:多从个Observables接收数据,处理它们。然后将它们合并成一个新的可观測序列来使用。RxJava有一个特殊的方法能够完毕:zip()合并两个或者多个Observables发射出的数据项,依据指定的函数Func*变换它们。并发射一个新值。下图展示了zip()方法怎样处理发射的“numbers”和“letters”然后将它们合并一个新的数据项:

RxJava开发精要6 - 组合Observables

对于“真实世界”的样例来说,我们将使用已安装的应用列表和一个新的动态的Observable来让样例变得有点有趣味。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable变量使用interval()函数每秒生成一个Long类型的数据:简单且高效。正如之前所说的,我们须要一个Func对象。

由于它须要传两个參数,所以是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
appInfo.setName(time + " " + appInfo.getName());
return appInfo;
}

如今我们的loadList()函数变成这样:

private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps); Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS); Observable.zip(observableApp, tictoc,
(AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
} @Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
} @Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}

正如你看到的那样。zip()函数有三个參数:两个Observables和一个Func2

细致一看会发现observeOn()函数。

它将在下一章中解说:如今我们能够小试一下。

结果例如以下:

RxJava开发精要6 - 组合Observables

Join

前面两个方法。zip()merge()方法作用在发射数据的范畴内,在决定怎样操作值之前有些场景我们须要考虑时间的。RxJava的join()函数基于时间窗体将两个Observables发射的数据结合在一起。

RxJava开发精要6 - 组合Observables

为了正确的理解上一张图。我们解释下join()须要的參数:

  • 第二个Observable和源Observable结合。
  • Func1參数:在指定的由时间窗体定义时间间隔内,源Observable发射的数据和从第二个Observable发射的数据相互配合返回的Observable。
  • Func1參数:在指定的由时间窗体定义时间间隔内,第二个Observable发射的数据和从源Observable发射的数据相互配合返回的Observable。
  • Func2參数:定义已发射的数据怎样与新发射的数据项相结合。
  • 例如以下练习的样例。我们能够改动loadList()函数像以下这样:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE); Observable<AppInfo> appsSequence =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position -> {
return apps.get(position.intValue());
}); Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS); appsSequence.join(
tictoc,
appInfo -> Observable.timer(2,TimeUnit.SECONDS),
time -> Observable.timer(0, TimeUnit.SECONDS),
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.take(10)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
} @Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
} @Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}

我们有一个新的对象appsSequence,它是一个每秒从我们已安装的app列表发射app数据的可观測序列。

tictoc这个Observable数据每秒仅仅发射一个新的Long型整数。为了合并它们,我们须要指定两个Func1变量:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描写叙述了两个时间窗体。

以下一行描写叙述我们怎样使用Func2将两个发射的数据结合在一起。

this::updateTitle

结果例如以下:

RxJava开发精要6 - 组合Observables

它看起来有点乱,可是注意app的名字和我们指定的时间窗体,我们能够看到:一旦第二个数据发射了我们就会将它与源数据结合,但我们用同一个源数据有2秒钟。这就是为什么标题反复数字累加的原因。

值得一提的是,为了简单起见,也有一个join()操作符作用于字符串然后简单的和发射的字符串连接成终于的字符串。

RxJava开发精要6 - 组合Observables

combineLatest

RxJava的combineLatest()函数有点像zip()函数的特殊形式。

正如我们已经学习的,zip()作用于近期未打包的两个Observables。相反。combineLatest()作用于近期发射的数据项:假设Observable1发射了A而且Observable2发射了B和C,combineLatest()将会分组处理AB和AC,例如以下图所看到的:

RxJava开发精要6 - 组合Observables

combineLatest()函数接受二到九个Observable作为參数,假设有须要的话或者单个Observables列表作为參数。

从之前的样例中把loadList()函数借用过来,我们能够改动一下来用于combineLatest()实现“真实世界”这个样例:

private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position ->apps.get(position.intValue()));
Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(appsSequence, tictoc,
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { @Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
} @Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
} @Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}

这我们使用了两个Observables:一个是每秒钟从我们已安装的应用列表发射一个App数据,第二个是每隔1.5秒发射一个Long型整数。

我们将他们结合起来并运行updateTitle()函数,结果例如以下:

RxJava开发精要6 - 组合Observables

正如你看到的。由于不同的时间间隔,AppInfo对象如我们所预料的那样有时候会反复。

And,Then和When

在将来另一些zip()满足不了的场景。

如复杂的架构,或者是仅仅为了个人爱好。你能够使用And/Then/When解决方式。

它们在RxJava的joins包下,使用Pattern和Plan作为中介,将发射的数据集合并到一起。

RxJava开发精要6 - 组合Observables

我们的loadList()函数将会被改动从这样:

private void loadList(List<AppInfo> apps) {

    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc); 

    Plan0<AppInfo> plan = pattern.then(this::updateTitle);

    JoinObservable
.when(plan)
.toObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { @Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
} @Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
} @Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
}
});
}

和通常一样。我们有两个发射的序列。observableApp,发射我们安装的应用列表数据。tictoc每秒发射一个Long型整数。如今我们用and()连接源Observable和第二个Observable。

JoinObservable.from(observableApp).and(tictoc);

这里创建一个pattern对象,使用这个对象我们能够创建一个Plan对象:”我们有两个发射数据的Observables,then()是做什么的?”

pattern.then(this::updateTitle);

如今我们有了一个Plan对象而且当plan发生时我们能够决定接下来发生的事情。

.when(plan).toObservable()

这时候,我们能够订阅新的Observable,正如我们总是做的那样。

Switch

有这样一个复杂的场景就是在一个subscribe-unsubscribe的序列里我们能够从一个Observable自己主动取消订阅来订阅一个新的Observable。

RxJava的switch(),正如定义的,将一个发射多个Observables的Observable转换成另一个单独的Observable。后者发射那些Observables近期发射的数据项。

给出一个发射多个Observables序列的源Observable,switch()订阅到源Observable然后開始发射由第一个发射的Observable发射的一样的数据。当源Observable发射一个新的Observable时,switch()马上取消订阅前一个发射数据的Observable(因此打断了从它那里发射的数据流)然后订阅一个新的Observable,并開始发射它的数据。

RxJava开发精要6 - 组合Observables

StartWith

我们已经学到怎样连接多个Observables并追加指定的值到一个发射序列里。

RxJava的startWith()concat()的相应部分。正如concat()向发射数据的Observable追加数据那样,在Observable開始发射他们的数据之前。 startWith()通过传递一个參数来先发射一个数据序列。

RxJava开发精要6 - 组合Observables

总结

这章中。我们学习了怎样将两个或者很多其它个Observable结合来创建一个新的可观測序列。

我们将能够merge Observable。join Observables 。zip Observables 并在几种情况下把他们结合在一起。

下一章。我们将介绍调度器,它将非常easy的帮助我们创建主线程以及提高我们应用程序的性能。我们也将学习怎样正确的运行长任务或者I/O任务来获得更好的性能。