当前位置: 首页 > article >正文

Java并发编程-线程池(三)

文章目录

  • 线程池实现原理
    • addWorker(Runnable firstTask, boolean core)
      • 1. 状态检查:校验线程池是否允许添加线程
      • 2. 工作线程数调整:CAS保证并发安全
      • 3. 初始化变量
      • 4. 创建 `Worker` 对象并获取线程
      • 5. 加锁保证线程安全
      • 6. 启动工作线程
      • 7. 异常处理
      • 核心作用

线程池实现原理

接下来,我们进入 addWork 方法, 在创建线程前会获取全局锁:

addWorker(Runnable firstTask, boolean core)

/* Set containing all worker threads in pool. Accessed only when* holding mainLock.*/
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))/ 若状态为STOP/TIDYING/TERMINATED(rs >= SHUTDOWN),直接拒绝新增线程。例外情况:当状态为SHUTDOWN且满足以下条件时允许添加 非核心线程 处理残留任务:1. firstTask == null(线程不携带新任务)2. workQueue非空(队列中仍有待处理任务)*/return false;for (;;) {int wc = workerCountOf(c); // 工作线程数(低29位)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))// 容量校验return false;if (compareAndIncrementWorkerCount(c))// CAS递增workerCountbreak retry; // 成功则退出整个retry循环c = ctl.get();  // 重新读取当前ctlif (runStateOf(c) != rs)continue retry; // 状态变化则重试外层循环// 若仅workerCount变化,继续内层循环重试CAS}}boolean workerStarted = false; //标记工作线程是否成功启动boolean workerAdded = false;//标记线程是否被成功添加到线程池Worker w = null;try {final ReentrantLock mainLock = this.mainLock;w = new Worker(firstTask);//通过线程工厂创建Worker对象(后面讲)final Thread t = w.thread;//获取其绑定的线程if (t != null) {mainLock.lock(); // 获取全局锁try {//重新检查线程池状态int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN || //线程池处于RUNNING状态时,允许添加新线程(rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN状态下仅允许添加无初始任务的线程(处理队列中的剩余任务)if (t.isAlive()) // 防止重复启动throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//若成功添加Worker,则启动其线程(真正开始消费任务)t.start();workerStarted = true;}}} finally {if (! workerStarted)//失败回滚:若未成功启动(如线程池已关闭),调用addWorkerFailed回滚资源addWorkerFailed(w);}return workerStarted;
}
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w); //移除WorkerdecrementWorkerCount();//减少计数tryTerminate(); //尝试终止线程池} finally {mainLock.unlock();}
}

1. 状态检查:校验线程池是否允许添加线程

int c = ctl.get();
int rs = runStateOf(c); // 运行状态(高3位)
if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;
  • 关键条件:

    • 若状态为STOP/TIDYING/TERMINATEDrs >= SHUTDOWN),**直接拒绝新增线程**。

    • 例外情况:当状态为SHUTDOWN且满足以下条件时允许添加**非核心线程**处理残留任务:

      • firstTask == null(线程不携带新任务)

      • workQueue非空(队列中仍有待处理任务),确保线程关闭时仍能处理队列残留任务

2. 工作线程数调整:CAS保证并发安全

for (;;) {int wc = workerCountOf(c); // 工作线程数(低29位)// 容量校验if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS递增workerCountif (compareAndIncrementWorkerCount(c))break retry; // 成功则退出整个retry循环// CAS失败后处理c = ctl.get(); // 重新读取当前ctlif (runStateOf(c) != rs)continue retry; // 状态变化则重试外层循环// 若仅workerCount变化,继续内层循环重试CAS
}
  • 双重校验逻辑:

    • 容量控制:通过参数core决定上限为核心/最大线程数,避免资源溢出。

    • CAS操作:通过compareAndIncrementWorkerCount原子性增加线程数,防止多线程竞争导致计数错误。

    • 失败处理:若检测到状态变化(如线程池关闭),重新进行外层状态检查。若仅线程数变化,则仅重试内层循环。

    • 双重循环:外层处理状态变化,内层处理线程数变更,分离关注点,提升并发效率。

3. 初始化变量

初始化 workerStartedworkerAddedfalse,表示工作线程未启动且未成功添加到工作线程集合。Worker 对象 w 的初始值为 null

4. 创建 Worker 对象并获取线程

w = new Worker(firstTask);
final Thread t = w.thread;

通过线程工厂创建 Worker 对象,并获取其绑定的线程 t。若线程工厂创建失败(如返回 null),后续逻辑直接跳转至第7步异常处理。

5. 加锁保证线程安全

获取 mainLock(全局锁),确保对线程池状态的检查和修改、workers 集合的操作是原子的:

6. 启动工作线程

if (workerAdded) {t.start();  // 启动线程workerStarted = true;
}

如果 Worker 成功添加到集合,则启动其绑定的线程。

7. 异常处理

finally {if (!workerStarted)addWorkerFailed(w); // 回滚失败的 Worker 操作
}
  • 若线程未启动(如锁获取后线程池已关闭),调用 addWorkerFailed 移除 Worker 并更新线程池状态。

核心作用

这段代码是线程池 ThreadPoolExecutor中添加工作线程的核心逻辑,它会创建Worker,Worker内置了一个线程(由线程工厂创建),这个线程会在Worker创建后启动,专门用于执行Worker的run()方法。另外还实现了:

  1. 线程安全的 Worker管理:通过 ReentrantLock 保证对共享变量(如 workers 集合)的原子操作。

  2. 状态双重检查:防止在加锁期间线程池状态发生变化。

  1. 异常回滚机制:确保部分失败的操作能被正确处理,保证线程池的稳定性。
http://www.lryc.cn/news/2380398.html

相关文章:

  • 《黑马前端ajax+node.js+webpack+git教程》(笔记)——node.js教程+webpack教程(nodejs教程)
  • Flink 快速入门
  • 高效管理多后端服务:Nginx 配置与实践指南
  • 阻塞队列:线程安全与生产者消费者模型解析
  • 【入门|Docker】基础知识扫盲:什么是 Docker?
  • 如何利用 Java 爬虫获得某书笔记详情:实战指南
  • 【MYSQL】基本查询,表的增删查改
  • 在嵌入式系统中, 一般链路层断开多久,断开TCP为好
  • Android Studio 日志系统详解
  • 基于matlab的D2D 功率控制仿真
  • 互联网大厂Java面试:从基础到复杂场景的技术挑战
  • 使用Redission来实现布隆过滤器
  • 为 Windows 和 Ubuntu 中设定代理服务器的详细方法
  • Feign异步模式丢失上下文问题
  • OpenCV阈值处理完全指南:从基础到高级应用
  • 【AWS入门】Amazon SageMaker简介
  • ArcGIS Pro 3.4 二次开发 - 内容
  • 如何在 MongoDB 中设计文档结构?与关系型数据库的表结构设计有何不同?
  • MYSQL 故障排查与生产环境优化
  • 解决使用@JsonFormat(pattern = “yyyy-MM-dd HH:mm:ss“, timezone = “GMT+8“)时区转换无效的问题
  • 计算机网络概要
  • Word压缩解决方案
  • Spring Boot开发—— 整合Lucene构建轻量级毫秒级响应的全文检索引擎
  • TDengine 2025年产品路线图
  • vue3中element-plus修改el-tooltip的宽度
  • Ubuntu服务器部署多语言项目(Node.js/Python)方式实践
  • 计算机网络 - 2.基础协议
  • Kafka消息路由分区机制深度解析:架构设计与实现原理
  • 机器学习中采样哪些事
  • 初识css,css语法怎样学好css以及常见问题与避坑