Java 线程池ThreadPoolExecutor源码解读
目录
1,创建线程的方式
1.1 继承自Thread
1.2 通过FutureTask+Callable实现
1.3 使用线程池实现
2,ThreadExecutorPool概述
2.1 重要的常量
2.2 拒绝策略
3,源码分析
3.1 ThreadPoolExecutor#execute
3.2 ThreadPoolExecutor#addWorker
3.3 ThreadPoolExecutor#getTask
3.4 线程池的线程复用逻辑
4,总结
本文分成以下几个部分:
- 创建线程的方式
- ThreadPoolExecutor概述
- ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析
1,创建线程的方式
1.1 继承自Thread
写一个类继承自Thread,并且调用start方法即可开启线程。这个是老生常谈的实现方法了,继承Thread的时候需要实现run,如果直接调用run的话是无法开启线程的。调用start最后会调用到native的start()方法,然后如果是Linux的话,底层使用的是Linux的api pthread。
1.3 使用线程池实现
使用线程池主要类似于这样:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); executor.execute(() -> { System.out.println("hello world"); });
其实JDK也提供了一些默认的线程池创建方法,但是一般都不推荐使用,因为这些方法可能不符合我们常规的业务需求。所以一般都使用手动创建的方式实现。
2,ThreadExecutorPool概述
2.1 重要的常量
// 高3位是线程池状态 低29为是线程的最大线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位全是1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits// 111 正常接受任务
private static final int RUNNING = -1 << COUNT_BITS;// 000 Executor#shutdown
// 不接收新的任务 阻塞队列中正在进行的任务也会正常处理
private static final int SHUTDOWN = 0 << COUNT_BITS;// 001 Exectuor#shutdownNow
// 不接收新的任务,不去处理阻塞队列中的任务,正在进行的任务也不会处理
private static final int STOP = 1 << COUNT_BITS;// 010 表示当前的线程池即将结束
private static final int TIDYING = 2 << COUNT_BITS;// Exectuor#terminated
// 011 线程池结束
private static final int TERMINATED = 3 << COUNT_BITS;
ctl这个AtomicInteger是基于自旋锁+CAS操作实现的。Executor本身也有生命周期,根据数值大小排序的生命周期状态是:
Running < Shutdown < Stop < Tidying < Terminated
- Running:当前的线程池中的线程正常运行,而且线程池接受新的Runnable
- Shutdown:在调用了shutdown方法会走到这个状态,并且此时不接收新的Runnable,但是会将阻塞队列中的Runnable处理完成
- Stop:在调用shutdownNow之后会走到这个状态,不接收新的Runnable,同时会暂停正在执行的线程
- Tidying:是一个中间的过渡状态,可能做一个内容的清理工作等等。
- Terminated:在调用terminated方法之后会到这个状态,线程池结束
在Java线程池ThreadPoolExecutor的实现中,RUNNING = -1 << COUNT_BITS
是定义线程池运行状态的关键常量,其原理如下:
COUNT_BITS
通常为29(32位整型的低29位用于记录线程数)-1
的二进制补码表示是11111111 11111111 11111111 11111111
- 左移29位后得到高3位
111
(十进制值-536870912),低29位全0 RUNNING
状态值最小(高3位111
),表示线程池能接收新任务并处理队列任务- 其他状态(SHUTDOWN/STOP/TIDYING/TERMINATED)的高3位依次递减(
000
~010
)
2.2 拒绝策略
AbortPolicy
直接会抛出异常
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
CallerRunsPolicy
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
这块直接调用了Runnable实现的run方法。run方法会在Excecutor所在的线程中执行,所以如果是耗时操作也会出现问题。
DiscardPolicy
/*** A handler for rejected tasks that silently discards the* rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
直接会放弃,什么都不会做
DiscardOldestPolicy
/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
会尝试获取ThreadPoolExecutor中的队列,然后将队列头,也就是最开始的一个出栈。
3,源码分析
任务加入线程池是一个这样的过程:
首先会询问核心线程是否有没有分配到的,通常是和核心线程数进行比较。如果核心线程都满了,就会通过阻塞队列进行缓冲。如果阻塞队列都放满了,就会看非核心线程是否到了最大的线程数,如果达到了最大线程,就会执行拒绝策略。
下面我们会通过看execute+addWorker的源码来还原这个过程。
3.1 ThreadPoolExecutor#execute
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果工作线程数小于核心线程线程数if (workerCountOf(c) < corePoolSize) {// 创建核心线程if (addWorker(command, true))return;// 考虑并发的情况的影响c = ctl.get();}// 如果线程池是Running状态,并且阻塞队列可以放入任务if (isRunning(c) && workQueue.offer(command)) {// 考虑并发的情况的影响int recheck = ctl.get();// 如果当前不是Running状态 执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 阻塞队列有任务,但是工作线程数为0,创建非核心线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 创建非核心线程执行任务 创建失败 执行拒绝策略else if (!addWorker(command, false))reject(command);
}
execute的代码中首先通过ctl进行位运算的分解获取当前的工作线程数,优先使用核心线程。然后下面会放入到阻塞队列中,如果阻塞队列中都放不下,再会看工作线程是否达到最大线程数。如果以上的执行都不能放入这个任务,就执行拒绝策略。
上面的代码中可以看到前面两步,也就是询问核心线程和询问阻塞队列是否放满,第三步看工作线程是否达到最大线程数是在addWorker中的
3.2 ThreadPoolExecutor#addWorker
part1 判断部分
addWorker的代码拆分成两部分来看,第一部分是进行一些条件判断:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果当前的线程池的状态是SHUTDOWNif (rs >= SHUTDOWN &&// 下面的判断等于// rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()// 1. 如果当前的状态已经是TIDYING 或者Terminated -> 肯定不能添加Worker// 2. 如果当前的状态是Shutdown,但是传入一个非空的task,已经不接收新的任务了 -> 肯定/// 不能添加Worker// 3. 如果当亲的状态是Shutdown,而且传入的task不为空,如果阻塞队列是空,之前的任务// 全部处理完成了 -> 肯定不能添加Worker! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 是否超过线程数(核心线程或者是最大线程)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 如果CAS操作完成成功if (compareAndIncrementWorkerCount(c))break retry;// 如果CAS操作失败,重新循环进行修改WorkerCountc = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}
特别是if中的第二个条件有点复杂,传入firstTask为空的情况是当阻塞队列中有任务,但是工作线程为0时,一般情况下firstTask肯定不为null。具体三种情况为什么需要返回false的原因我写在注释中了~
下面我们来看看执行的逻辑
part2 执行部分
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的Workerw = new Worker(firstTask);// 从Worker中取出线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 如果现在的线程池是Running状态 或 当前的线程池虽然是Shutdown// 但是还是需要处理阻塞队列中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 放入HashSetworkers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 确认加入workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
总的来说addWorker除了进行Worker的构建和添加到Workers之外,还进行了Worker中线程的启动,这块是真正执行我们定义的逻辑的地方。
我们再来看看Worker:
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// Worker也是一个Runnable,以当前的Worker作为Runnable构建新的线程this.thread = getThreadFactory().newThread(this);
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {// 空方法// 省略异常处理的逻辑beforeExecute(wt, task);task.run();afterExecute(task, thrown);} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
因为Worker本身也是一个Runnable,所以当调用start的时候会执行Woker的run方法,Worker#run调用了runWorker。在runWorker中,我们重写的run会被执行。同时提供了两个钩子:beforeExecute和afterExecute,这两个方法本身是空实现,我们可以自行定义执行一些操作。
作为判断条件的代码我使用黄色底的字体标记出来了,具体的逻辑就是这样。
3.3 ThreadPoolExecutor#getTask
getTask其实就是从阻塞队列wokerQueue中获取task这样一件事情。
3.4 线程池的线程复用逻辑
这块直接上图,在addWorker中会执行Worker中的Thread#start,我们知道执行完成start之后就不能再次调用start。线程池与其说他是复用Thread,不如说他是不断地向Thread中填充新的Runnable,然后调用run,减少了创建Thread的开销。我们仔细看看addWorker的核心代码:
// ThreadPoolExecutor#runWorker
while (task != null || (task = getTask()) != null) {// ...task.run();// ...
}
不断地从workerQueue中取出新的Task,然后执行run。如果wokerQueue为空,getTask就会阻塞,等到有了新的Task再执行。
4,总结
- JDK中提供了一些可以直接启动线程池的方式,但是我们最好自己写一个ThreadPoolExecutor进行调整参数。ThreadPoolExecutor有以下几个核心参数:核心线程数、最大线程数、线程存活时间、阻塞队列
- ThreadPoolExecutor是有5中状态的,Running,Shutdown,Stop,Tidying,terminated。
- execute比较好理解,我们使用Runnable添加到ThreadPoolExecutor之后,首先会创建核心线程,核心线程其实就是一个标志位为true的Worker。Worker内部有一个Thread,会在addWorker方法中启动(Thread#start)。但是ThreadPoolExecutor其实并不会立刻【放过】Worker中的Thread。如果后续有runnable被放到阻塞队列之后,会从阻塞队列中读取。这点其实也是复用机制的关键。