![彻底搞清楚 RxJava 是什么东西 彻底搞清楚 RxJava 是什么东西](https://image.shishitao.com:8440/aHR0cHM6Ly9ia3FzaW1nLmlrYWZhbi5jb20vdXBsb2FkL2NoYXRncHQtcy5wbmc%2FIQ%3D%3D.png?!?w=700&webp=1)
其实从rxjava14年出现到现在,我是去年从一个朋友那里听到的,特别是随着现在app项目越来越大,分层越来越不明确的情况下,rxjava出现了,以至于出现了rxandroid。其实如果你了解观察者模式的话,rxjava并没有你说的那么神秘。再次,我对rxjava并不崇拜,我的原则是怎么写代码简单,代码结构清晰,维护简单,就是好框架。
讲rxjava之前首先说一下Android mvp开发模式。
MVP的工作流程
- Presenter负责逻辑的处理,
- Model提供数据,
- View负责显示。
作为一种新的模式,在MVP中View并不直接使用Model,它们之间的通信是通过Presenter来进行的,所有的交互都发生在Presenter内部,而在MVC中View会从直接Model中读取数据而不是通过 Controller。
接下来说说rxjava
- RxJava 到底是什么
- RxJava 好在哪
- API 介绍和原理简析
- RxJava 的适用场景和使用方式
- 最后
如果你要了解rxjava是什么,由来,以及作用和原理,请点击上面的链接。
针对上面的问题,我们简单的了解下一些基本的概念。
什么是rxJava
rxJava的好处
Android 创造的
AsyncTask
和Handler
,其实都是为了让异步代码更加简洁。RxJava的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
看下rxjava的例子
rxjava原理简析
Observer
)和被观察者(Observable
)。观察者通过将被观察的对象加到自己的观察队列中,当被观察者发生改变时,就会通知观察者东西已经改变。
Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知 Observer
数据刷新。
RxJava 的事件回调方法除了普通事件
onNext()
(相当于 onClick()
/ onEvent()
)之外,还定义了两个特殊的事件:onCompleted()
onError()
。-
onCompleted()
:
事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext()
发出时,需要触发onCompleted()
方法作为标志。 -
onError()
: 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。
注意:在一个正确运行的事件序列中, onCompleted()
和 onError()
有且只有一个,也就是说onCompleted()
和 onError()
二者也是互斥的。在响应的队列中只能调用一个。
![彻底搞清楚 RxJava 是什么东西 彻底搞清楚 RxJava 是什么东西](https://image.shishitao.com:8440/aHR0cDovL2ltZy5ibG9nLmNzZG4ubmV0LzIwMTYwNzMxMTAzMjEzMTAwP3dhdGVybWFyay8yL3RleHQvYUhSMGNEb3ZMMkpzYjJjdVkzTmtiaTV1WlhRdi9mb250LzVhNkw1TDJUL2ZvbnRzaXplLzQwMC9maWxsL0kwSkJRa0ZDTUE9PS9kaXNzb2x2ZS83MC9ncmF2aXR5L0NlbnRlcg%3D%3D.jpg?w=700&webp=1)
1) 创建 Observer(被观察者对象)
//Observable部分,被观察者部分 Observable<String> myObservable=Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("我是被观察的对象"); subscriber.onCompleted(); } });
2) 创建Subscriber(观察者对象)
//Subscriber部分,观察者部分 Subscriber<String> mySubscriber=new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { text.setText(s); } };
3) Observer和Subscriber关联
myObservable.subscribe(mySubscriber);
这样就完成了一个简单的rxjava,是不是很简单。
![彻底搞清楚 RxJava 是什么东西 彻底搞清楚 RxJava 是什么东西](https://image.shishitao.com:8440/aHR0cDovL2ltZy5ibG9nLmNzZG4ubmV0LzIwMTYwNzMxMTExMDU2NTU0P3dhdGVybWFyay8yL3RleHQvYUhSMGNEb3ZMMkpzYjJjdVkzTmtiaTV1WlhRdi9mb250LzVhNkw1TDJUL2ZvbnRzaXplLzQwMC9maWxsL0kwSkJRa0ZDTUE9PS9kaXNzb2x2ZS83MC9ncmF2aXR5L0NlbnRlcg%3D%3D.jpg?w=700&webp=1)
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
还支持不完整定义的回调,RxJava
会自动根据定义创建出Subscriber
。
Observable.just("Hello, world!") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
使用java8的lambda可以使代码更简洁
Observable.just("Hello, world!") .subscribe(s -> System.out.println(s));
然而如果你认为rxjava只有这个用处,那么也什么牛逼的,在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于
RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler
。
Scheduler (线程调度器)
线程控制与调度
subscribe()
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。而如果要实现线程的调度,就需要scheduler(线程调度器)。已经内置了几个
Scheduler
,它们已经适合大多数的使用场景:-
Schedulers.immediate()
:
直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
。 -
Schedulers.newThread()
:
总是启用新线程,并在新线程执行操作。 -
Schedulers.io()
: I/O
操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。 -
Schedulers.computation()
:
计算所使用的Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为
CPU 核数。不要把 I/O 操作放在computation()
中,否则 I/O 操作的等待时间会浪费 CPU。 - 另外, Android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在
Android 主线程运行。
Sceeduler默认给我们提供了subscribeOn()
和 observeOn()
两个方法来对线程进行控制 。
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { } });
上面这段代码中,由于 subscribeOn(Schedulers.io())
的指定,被创建的事件的内容 1
、2
、3
、4
将会在
IO 线程发出;而由于
observeOn(AndroidScheculers.mainThread()
)的指定,因此
subscriber
数字的打印将发生在主线程。事实上,这种在
subscribe()
subscribeOn(Scheduler.io())
和 observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } });
这样,加载图片发生在UI线程,而设置显示放到子线程出来,这样就不会出现卡顿。
变换
Observable.just("images/logo.png") // 输入类型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } });
这里出现了一个 Func1
的类。它和 Action1
非常相似,也是
RxJava 的一个接口,用于包装含有一个参数的方法。 Func1
和 Action
的区别在于, Func1
包装的是有返回值的方法。FuncX
和ActionX
的区别在 FuncX
包装的是有返回值的方法。
通过上面的代码我们看到:map()
方法将参数中的 String
对象转换成一个 Bitmap
对象后返回,而在经过 map()
方法后,事件的参数类型也由 String
转为了 Bitmap。这就是最长久的转换。
map()
:
事件对象的直接变换示意图:
![彻底搞清楚 RxJava 是什么东西 彻底搞清楚 RxJava 是什么东西](https://image.shishitao.com:8440/aHR0cDovL2ltZy5ibG9nLmNzZG4ubmV0LzIwMTYwNzMxMTU0ODUxNTQ4P3dhdGVybWFyay8yL3RleHQvYUhSMGNEb3ZMMkpzYjJjdVkzTmtiaTV1WlhRdi9mb250LzVhNkw1TDJUL2ZvbnRzaXplLzQwMC9maWxsL0kwSkJRa0ZDTUE9PS9kaXNzb2x2ZS83MC9ncmF2aXR5L0NlbnRlcg%3D%3D.jpg?w=700&webp=1)
flatMap()
:
这是一个很有用但非常难理解的变换
首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的属性(我选择属性,是因为如果对象可以打印,你们单个属性肯定不是问题)。
Student[] students = ...; Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ... }; Observable.from(students) .subscribe(subscriber);
写法也很简单,看得也很明白。
![彻底搞清楚 RxJava 是什么东西 彻底搞清楚 RxJava 是什么东西](https://image.shishitao.com:8440/aHR0cDovL2ltZy5ibG9nLmNzZG4ubmV0LzIwMTYwNzMxMTU1MzMzNzU1P3dhdGVybWFyay8yL3RleHQvYUhSMGNEb3ZMMkpzYjJjdVkzTmtiaTV1WlhRdi9mb250LzVhNkw1TDJUL2ZvbnRzaXplLzQwMC9maWxsL0kwSkJRa0ZDTUE9PS9kaXNzb2x2ZS83MC9ncmF2aXR5L0NlbnRlcg%3D%3D.jpg?w=700&webp=1)
变换的原理:lift()
RxJava 的内部,它们是基于同一个基础的变换方法:
lift(Operator)
。首先看一下 lift()
的内部实现(仅核心代码):// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。 public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }