当前位置: 首页 > news >正文

Java 线程池ThreadPoolExecutor源码解读

目录

1,创建线程的方式

1.1 继承自Thread

1.2 通过FutureTask+Callable实现

1.3 使用线程池实现

2,ThreadExecutorPool概述

2.1 重要的常量

2.2 拒绝策略

3,源码分析

3.1 ThreadPoolExecutor#execute

3.2 ThreadPoolExecutor#addWorker

3.3 ThreadPoolExecutor#getTask

3.4 线程池的线程复用逻辑

4,总结


本文分成以下几个部分:

  1. 创建线程的方式
  2. ThreadPoolExecutor概述
  3. ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析

1,创建线程的方式

1.1 继承自Thread

写一个类继承自Thread,并且调用start方法即可开启线程。这个是老生常谈的实现方法了,继承Thread的时候需要实现run,如果直接调用run的话是无法开启线程的。调用start最后会调用到native的start()方法,然后如果是Linux的话,底层使用的是Linux的api pthread。

1.3 使用线程池实现

使用线程池主要类似于这样:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4, 10, TimeUnit.MINUTES,                  new LinkedBlockingDeque<>()); executor.execute(() -> {     System.out.println("hello world"); });

其实JDK也提供了一些默认的线程池创建方法,但是一般都不推荐使用,因为这些方法可能不符合我们常规的业务需求。所以一般都使用手动创建的方式实现。

2,ThreadExecutorPool概述

2.1 重要的常量

// 高3位是线程池状态 低29为是线程的最大线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位全是1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits// 111 正常接受任务
private static final int RUNNING    = -1 << COUNT_BITS;// 000 Executor#shutdown
// 不接收新的任务 阻塞队列中正在进行的任务也会正常处理
private static final int SHUTDOWN   =  0 << COUNT_BITS;// 001 Exectuor#shutdownNow
// 不接收新的任务,不去处理阻塞队列中的任务,正在进行的任务也不会处理
private static final int STOP       =  1 << COUNT_BITS;// 010 表示当前的线程池即将结束
private static final int TIDYING    =  2 << COUNT_BITS;// Exectuor#terminated
// 011 线程池结束
private static final int TERMINATED =  3 << COUNT_BITS;

ctl这个AtomicInteger是基于自旋锁+CAS操作实现的。Executor本身也有生命周期,根据数值大小排序的生命周期状态是:

Running < Shutdown < Stop < Tidying < Terminated

  • Running:当前的线程池中的线程正常运行,而且线程池接受新的Runnable
  • Shutdown:在调用了shutdown方法会走到这个状态,并且此时不接收新的Runnable,但是会将阻塞队列中的Runnable处理完成
  • Stop:在调用shutdownNow之后会走到这个状态,不接收新的Runnable,同时会暂停正在执行的线程
  • Tidying:是一个中间的过渡状态,可能做一个内容的清理工作等等。
  • Terminated:在调用terminated方法之后会到这个状态,线程池结束

        在Java线程池ThreadPoolExecutor的实现中,RUNNING = -1 << COUNT_BITS 是定义线程池运行状态的关键常量,其原理如下:‌

  • COUNT_BITS 通常为29(32位整型的低29位用于记录线程数)
  • -1 的二进制补码表示是11111111 11111111 11111111 11111111
  • 左移29位后得到高3位111(十进制值-536870912),低29位全0
  • RUNNING 状态值最小(高3位111),表示线程池能接收新任务并处理队列任务
  • 其他状态(SHUTDOWN/STOP/TIDYING/TERMINATED)的高3位依次递减(000~010

2.2 拒绝策略

AbortPolicy

直接会抛出异常

    /*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

CallerRunsPolicy

    /*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

这块直接调用了Runnable实现的run方法。run方法会在Excecutor所在的线程中执行,所以如果是耗时操作也会出现问题。

DiscardPolicy

    /*** A handler for rejected tasks that silently discards the* rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

直接会放弃,什么都不会做

DiscardOldestPolicy

    /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

会尝试获取ThreadPoolExecutor中的队列,然后将队列头,也就是最开始的一个出栈。  

3,源码分析

任务加入线程池是一个这样的过程:

首先会询问核心线程是否有没有分配到的,通常是和核心线程数进行比较。如果核心线程都满了,就会通过阻塞队列进行缓冲。如果阻塞队列都放满了,就会看非核心线程是否到了最大的线程数,如果达到了最大线程,就会执行拒绝策略。

下面我们会通过看execute+addWorker的源码来还原这个过程。

3.1 ThreadPoolExecutor#execute

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果工作线程数小于核心线程线程数if (workerCountOf(c) < corePoolSize) {// 创建核心线程if (addWorker(command, true))return;// 考虑并发的情况的影响c = ctl.get();}// 如果线程池是Running状态,并且阻塞队列可以放入任务if (isRunning(c) && workQueue.offer(command)) {// 考虑并发的情况的影响int recheck = ctl.get();// 如果当前不是Running状态 执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 阻塞队列有任务,但是工作线程数为0,创建非核心线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 创建非核心线程执行任务 创建失败 执行拒绝策略else if (!addWorker(command, false))reject(command);
}

execute的代码中首先通过ctl进行位运算的分解获取当前的工作线程数,优先使用核心线程。然后下面会放入到阻塞队列中,如果阻塞队列中都放不下,再会看工作线程是否达到最大线程数。如果以上的执行都不能放入这个任务,就执行拒绝策略。

上面的代码中可以看到前面两步,也就是询问核心线程和询问阻塞队列是否放满,第三步看工作线程是否达到最大线程数是在addWorker中的

3.2 ThreadPoolExecutor#addWorker

part1 判断部分

addWorker的代码拆分成两部分来看,第一部分是进行一些条件判断:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果当前的线程池的状态是SHUTDOWNif (rs >= SHUTDOWN &&// 下面的判断等于// rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()// 1. 如果当前的状态已经是TIDYING 或者Terminated -&gt; 肯定不能添加Worker// 2. 如果当前的状态是Shutdown,但是传入一个非空的task,已经不接收新的任务了 -&gt; 肯定///    不能添加Worker// 3. 如果当亲的状态是Shutdown,而且传入的task不为空,如果阻塞队列是空,之前的任务// 全部处理完成了 -&gt; 肯定不能添加Worker! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 是否超过线程数(核心线程或者是最大线程)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 如果CAS操作完成成功if (compareAndIncrementWorkerCount(c))break retry;// 如果CAS操作失败,重新循环进行修改WorkerCountc = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;}}

特别是if中的第二个条件有点复杂,传入firstTask为空的情况是当阻塞队列中有任务,但是工作线程为0时,一般情况下firstTask肯定不为null。具体三种情况为什么需要返回false的原因我写在注释中了~

下面我们来看看执行的逻辑

part2 执行部分

boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的Workerw = new Worker(firstTask);// 从Worker中取出线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 如果现在的线程池是Running状态 或 当前的线程池虽然是Shutdown// 但是还是需要处理阻塞队列中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null))  {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 放入HashSetworkers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 确认加入workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

总的来说addWorker除了进行Worker的构建和添加到Workers之外,还进行了Worker中线程的启动,这块是真正执行我们定义的逻辑的地方。

我们再来看看Worker:

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// Worker也是一个Runnable,以当前的Worker作为Runnable构建新的线程this.thread = getThreadFactory().newThread(this);
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &amp;&amp;runStateAtLeast(ctl.get(), STOP))) &amp;&amp; !wt.isInterrupted())wt.interrupt();try {// 空方法// 省略异常处理的逻辑beforeExecute(wt, task);task.run();afterExecute(task, thrown);} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

因为Worker本身也是一个Runnable,所以当调用start的时候会执行Woker的run方法,Worker#run调用了runWorker。在runWorker中,我们重写的run会被执行。同时提供了两个钩子:beforeExecute和afterExecute,这两个方法本身是空实现,我们可以自行定义执行一些操作。

作为判断条件的代码我使用黄色底的字体标记出来了,具体的逻辑就是这样。

3.3 ThreadPoolExecutor#getTask

getTask其实就是从阻塞队列wokerQueue中获取task这样一件事情。

3.4 线程池的线程复用逻辑

这块直接上图,在addWorker中会执行Worker中的Thread#start,我们知道执行完成start之后就不能再次调用start。线程池与其说他是复用Thread,不如说他是不断地向Thread中填充新的Runnable,然后调用run,减少了创建Thread的开销。我们仔细看看addWorker的核心代码:

// ThreadPoolExecutor#runWorker
while (task != null || (task = getTask()) != null) {// ...task.run();// ...
}

不断地从workerQueue中取出新的Task,然后执行run。如果wokerQueue为空,getTask就会阻塞,等到有了新的Task再执行。

4,总结

  1. JDK中提供了一些可以直接启动线程池的方式,但是我们最好自己写一个ThreadPoolExecutor进行调整参数。ThreadPoolExecutor有以下几个核心参数:核心线程数、最大线程数、线程存活时间、阻塞队列
  2. ThreadPoolExecutor是有5中状态的,Running,Shutdown,Stop,Tidying,terminated。
  3. execute比较好理解,我们使用Runnable添加到ThreadPoolExecutor之后,首先会创建核心线程,核心线程其实就是一个标志位为true的Worker。Worker内部有一个Thread,会在addWorker方法中启动(Thread#start)。但是ThreadPoolExecutor其实并不会立刻【放过】Worker中的Thread。如果后续有runnable被放到阻塞队列之后,会从阻塞队列中读取。这点其实也是复用机制的关键。
http://www.lryc.cn/news/627034.html

相关文章:

  • 服务器内存条不识别及服务器内存位置图
  • linux的sysctl系统以及systemd系统。
  • 【网络运维】Linux 文本处理利器:sed 命令
  • MYSQL-增删查改CRUD
  • uni-app跨端开发最后一公里:详解应用上架各大应用商店全流程
  • 生产级的雪花算法
  • 自动驾驶导航信号使用方式调研
  • C语言实现全排列(非递归法)(以猪八戒买包子的故事为例解释)
  • SpringBoot 整合 Langchain4j RAG 技术深度使用解析
  • imx6ull-驱动开发篇30——Linux 非阻塞IO实验
  • redis---常用数据类型及内部编码
  • 设计具有功能安全和网络安全能力的新型半导体芯片
  • 攻克PostgreSQL专家认证
  • Unicode 字符串转 UTF-8 编码算法剖析
  • JVM面试精选 20 题(终)
  • SQL count(*)与 sum 区别
  • 第三阶段数据-4:SqlHelper类,数据库删除,DataTable创建
  • STM32F4 内存管理介绍及应用
  • 建模工具Sparx EA的多视图协作教程
  • PyTorch - Developer Notes
  • 吴恩达 Machine Learning(Class 3)
  • 国产化PDF处理控件Spire.PDF教程:如何使用 Python 添加水印到 PDF
  • Linux命令大全-ps命令
  • Linux系统之部署nullboard任务管理工具
  • 基于springboot中学信息技术课程教学网站
  • 栈上创建和堆上创建区别
  • Nginx 的完整配置文件结构、配置语法以及模块详解
  • 设计模式1-单例模式
  • 继续记事本项目
  • 盲盒商城h5源码搭建可二开幸运盲盒回收转增定制开发教程