RxJava的几种Scheduler

最近在做和RxJava相关的工作。RxJava是一个基于Observer模式1的异步式框架,用来处理Event based tasks比较合适。它的实现是基于Java的Thread库。

这篇文章探讨RxJava里面的几种Scheduler,首先是single类型的Scheduler:

Scheduler scheduler = Schedulers.single();

这个是最简单的一种Scheduler,就是一个单线程的实现。我们可以通过下面的代码来做验证:

import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

public class Main {
	public static void main(String args[]) throws Exception {
		Scheduler scheduler = Schedulers.newThread();
		Scheduler.Worker worker1 = scheduler.createWorker();
		Scheduler.Worker worker2 = scheduler.createWorker();

		worker1.schedule(() -> {
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("worker1");
		});
		worker1.schedule(() -> {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("worker1 #2");
		});
		worker2.schedule(() -> {
			System.out.println("worker2");
		});

		Thread.sleep(4000);
	}
}

如上图所示,我们从这个single类型的Scheduler里面创建了两个worker,分别是worker1worker2。其中worker1在schedule两个任务的时候,第一个任务要休眠1秒,第二个任务要休眠2秒。然后后面worker2进行schedule。

因为我们的Scheduler是single类型的,也就是单线程的,所以worker2的任务应该被worker1的两个任务给block住。执行上面的代码,看结果,就可以验证这一点:

worker1
worker1 #2
worker2

在实际的执行过程中,worker2确实等待worker1的任务执行完成后,才开始执行。此外,可以看到worker1内部的两个任务也是顺序执行。

接下来我们换一种Scheduler来执行任务,使用newThread()类型的Scheduler:

Scheduler scheduler = Schedulers.newThread();

这种类型的Scheduler会给每个worker开启新线程,因此替换之后,worker2应该会比worker1先执行完成。更改完Scheduler类型之后,重新运行上面的代码,得到结果如下:

worker2
worker1
worker1 #2

可以看到结果和预期一致。值得注意的是,在worker1这边,两个任务仍然是顺序执行的。也就是说worker内部不会再开启新的线程执行提交进来的任务。

Scheduler剩下的几种类型都是多线程的,只不过对线程的创建和管理方式有所不同。比如:

Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());

向上面这个.from()类型的,就是要用户自己提交一个ExecutorServiceExecutorService是Java的线程池的一个管理接口,属于Java并发领域的的基础库,就不在这篇文章里面过多说明。上面的代码等于给Scheduler提供了一个CachedThreadPool

接下来还有io()computation()类型的Scheduler:

Schedulers.io();
Schedulers.computation();

这两种Scheduler都是自行管理线程池。其中io()类型的Scheduler会根据任务数量自动扩大线程池里面的线程数量,而computation()类型的则会根据机器的CPU数量,有一个最大的线程上限,达到这个上限的时候,线程就不再创建了,worker手里的任务就要排队,等待有了线程资源再执行。

具体的内部实现不是这篇文章要讲的,本篇的重点是帮助大家理解Scheduler是如何通过worker来执行任务的,以及Scheduler内部是如何通过多线程来让worker进行并行处理的。

  1. https://en.wikipedia.org/wiki/Observer_pattern 

My Github Page: https://github.com/liweinan

Powered by Jekyll and Theme by solid

If you have any question want to ask or find bugs regarding with my blog posts, please report it here:
https://github.com/liweinan/liweinan.github.io/issues