RxJava的几种Scheduler
最近在做和RxJava相关的工作。RxJava是一个基于Observer模式1的异步式框架,用来处理Event based tasks比较合适。它的实现是基于Java的Thread库。
这篇文章探讨RxJava里面的几种Scheduler,首先是single类型的Scheduler:
Scheduler scheduler = Schedulers.single();
这个是最简单的一种Scheduler,就是一个单线程的实现。我们可以通过下面的代码来做验证:
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
public class Main {
public static void main(String args[]) throws Exception {
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker1 = scheduler.createWorker();
Scheduler.Worker worker2 = scheduler.createWorker();
worker1.schedule(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("worker1");
});
worker1.schedule(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("worker1 #2");
});
worker2.schedule(() -> {
System.out.println("worker2");
});
Thread.sleep(4000);
}
}
如上图所示,我们从这个single类型的Scheduler里面创建了两个worker,分别是worker1
和worker2
。其中worker1
在schedule两个任务的时候,第一个任务要休眠1秒,第二个任务要休眠2秒。然后后面worker2
进行schedule。
因为我们的Scheduler是single类型的,也就是单线程的,所以worker2的任务应该被worker1的两个任务给block住。执行上面的代码,看结果,就可以验证这一点:
worker1
worker1 #2
worker2
在实际的执行过程中,worker2确实等待worker1的任务执行完成后,才开始执行。此外,可以看到worker1内部的两个任务也是顺序执行。
接下来我们换一种Scheduler来执行任务,使用newThread()
类型的Scheduler:
Scheduler scheduler = Schedulers.newThread();
这种类型的Scheduler会给每个worker开启新线程,因此替换之后,worker2
应该会比worker1
先执行完成。更改完Scheduler类型之后,重新运行上面的代码,得到结果如下:
worker2
worker1
worker1 #2
可以看到结果和预期一致。值得注意的是,在worker1这边,两个任务仍然是顺序执行的。也就是说worker内部不会再开启新的线程执行提交进来的任务。
Scheduler剩下的几种类型都是多线程的,只不过对线程的创建和管理方式有所不同。比如:
Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
向上面这个.from()
类型的,就是要用户自己提交一个ExecutorService
。ExecutorService
是Java的线程池的一个管理接口,属于Java并发领域的的基础库,就不在这篇文章里面过多说明。上面的代码等于给Scheduler提供了一个CachedThreadPool
。
接下来还有io()
和computation()
类型的Scheduler:
Schedulers.io();
Schedulers.computation();
这两种Scheduler都是自行管理线程池。其中io()
类型的Scheduler会根据任务数量自动扩大线程池里面的线程数量,而computation()
类型的则会根据机器的CPU数量,有一个最大的线程上限,达到这个上限的时候,线程就不再创建了,worker手里的任务就要排队,等待有了线程资源再执行。
具体的内部实现不是这篇文章要讲的,本篇的重点是帮助大家理解Scheduler是如何通过worker来执行任务的,以及Scheduler内部是如何通过多线程来让worker进行并行处理的。
-
https://en.wikipedia.org/wiki/Observer_pattern ↩
- 上一篇 正则表达式的边界
- 下一篇 使用Autotools编译项目(下)