RxJava学习笔记(三)--- 线程调度Scheduler

时间:2021-12-28 13:58:01

线程调度

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

Scheduler 的 API

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。 不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Schedulers.from(executor):使用指定的Executor作为调度器
  • Schedulers.trampoline():当其它排队的任务完成后,在当前线程排队开始执行
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

  1. subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  2. observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer number) {
    Log.d(tag, "number:" + number);
    }
    });

    上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

注意:多次使用subscribeOn()和observeOn()可以切换事件的发生线程和回调线程。

Scheduler拓展

除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。下面的示例展示了Scheduler.Worker的用法:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

@Override
public void call() {
yourWork();
}

});
// some time later...
worker.unsubscribe();

递归调度器

要调度递归的方法调用,你可以使用schedule,然后再用schedule(this),示例:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}

});
// some time later...
worker.unsubscribe();

检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}

});

Worker同时是Subscription,因此你可以(通常也应该)调用它的unsubscribe方法通知可以挂起任务和释放资源了。

延时和周期调度器

你可以使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒之后开始执行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务,下面例子的任务将在500毫秒之后执行,然后每250毫秒执行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

测试调度器

TestScheduler让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:

  • advanceTimeTo(time,unit) 向前波动调度器的时钟到一个指定的时间点
  • advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段
  • triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间

线程调度器(Scheduler)是将RxJava从同步观察者模式转到异步观察者模式的一个重要工具,有了它才能更好的将RxJava应用到项目中。

参考:

https://github.com/ReactiveX/RxJava

http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_1

http://blog.csdn.net/lzyzsd/article/details/44094895

http://www.ithao123.cn/content-9344110.html

http://blog.csdn.net/tangxl2008008/article/details/51334295

http://www.jianshu.com/p/129a26c8ba9e

https://github.com/mcxiaoke/RxDocs/blob/master/Scheduler.md