JUC常用线程辅助类详解
一、CountDownLatch
1.1 概述
倒计时门闩, 当执行任务的线程数量到达为0的时候,触发.
- 可以理解为: 一个教室有多个人,直到所有人走光之后, 班长锁门, 这里需要等待所有人都走完.
一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch使用给定的计数进行初始化。 由于调用了countDown方法, await方法会阻塞直到当前计数达到零,然后释放所有等待线程,并且任何后续的await调用都会立即返回。 这是一种一次性现象——计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
CountDownLatch是一种通用的同步工具,可用于多种目的。 计数为 1 的CountDownLatch用作简单的开/关锁存器或门:调用await所有线程在门处等待,直到它被调用countDown的线程countDown 。 初始化为N的CountDownLatch可用于使一个线程等待,直到N 个线程完成某个操作,或者某个操作已完成 N 次.
1.2 示例代码
package cn.tcmeta.thread;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** CountDownLatchDemo*/
public class CountDownLatchDemo {public static void main(String[] args) {// 等待的线程数量为5CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {new Thread(() ->{try {System.out.println(Thread.currentThread().getName() + " --> \t" + " 正在执行!");TimeUnit.MILLISECONDS.sleep(5000);System.out.println(Thread.currentThread().getName() + " --> \t" + "执行成功!" );// 当任务执行完成之后,要进行减1操作countDownLatch.countDown();}catch (Exception e){e.printStackTrace();}}, "线程: " + i).start();}try {// 当计数为0时,停止阻塞.相当于打开门闩.执行后续的逻辑countDownLatch.await();}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" + " 所有任务执行完成!");}
}
程序执行结果:
1.3 使用场景
1.3.1 并行任务启动后等待所有任务完成
场景:主线程需要启动多个子线程并行处理任务(如批量处理数据),并在所有子线程完成后进行结果汇总
CountDownLatch doneLatch = new CountDownLatch(TASK_COUNT);for (int i = 0; i < TASK_COUNT; i++) {new Thread(() -> {try {// 执行子任务(如调用外部API、计算等)} finally {doneLatch.countDown(); // 任务完成时计数器减1}}).start();
}doneLatch.await(); // 主线程阻塞,直到所有子线程完成
System.out.println("所有任务处理完毕,开始汇总结果");
1.3.2 服务启动时依赖资源初始化
场景:微服务启动时需要加载多个独立资源(如数据库连接、缓存预热、配置文件加载),主线程需等待所有资源初始化完成后才启动服务。
CountDownLatch initLatch = new CountDownLatch(3); // 3个依赖资源// 线程1:初始化数据库
new Thread(() -> { initDatabase(); initLatch.countDown(); }).start();// 线程2:预热缓存
new Thread(() -> { warmUpCache(); initLatch.countDown(); }).start();// 线程3:加载配置文件
new Thread(() -> { loadConfig(); initLatch.countDown(); }).start();initLatch.await(); // 等待所有资源初始化
startServer(); // 启动服务
1.3.3 并发性能测试
模拟高并发请求,需要所有请求线程在同一时刻发起操作(如压力测试接口)。
CountDownLatch startLatch = new CountDownLatch(1); // 发令枪
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); // 结束标识for (int i = 0; i < THREAD_COUNT; i++) {new Thread(() -> {try {startLatch.await(); // 所有线程在此等待callTestAPI(); // 同时发起请求} finally {endLatch.countDown();}}).start();
}Thread.sleep(500); // 确保所有线程就绪
startLatch.countDown(); // 统一放行
endLatch.await(); // 等待所有请求完成
generateReport(); // 生成测试报告
1.3.4 多阶段任务协作
游戏服务器中,玩家需全部准备就绪后游戏才开始,且需等待所有玩家完成第一关卡后再进入下一关。
// 等待所有玩家准备
CountDownLatch readyLatch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> player.prepare(() -> readyLatch.countDown())
);
readyLatch.await();
startGame();// 等待所有玩家完成第一关
CountDownLatch level1Latch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> player.finishLevel1(() -> level1Latch.countDown())
);
level1Latch.await();
startLevel2();
1.3.5 分布式任务分片同步
[!tip]
分布式计算中,主节点将任务分发给多个工作节点,需等待所有节点返回结果后再合并。
List<WorkerNode> nodes = getWorkerNodes();
CountDownLatch resultLatch = new CountDownLatch(nodes.size());for (WorkerNode node : nodes) {node.executeTask(dataShard, () -> {// 返回分片处理结果resultLatch.countDown();});
}resultLatch.await();
mergeResults(); // 合并所有分片结果
1.4 关键注意事项
- 计数器不可重置:CountDownLatch的计数器归零后无法重置,如需重复使用,改用CyclicBarrier。
- 异常处理:确保countDown()在finally块中调用,避免线程异常导致主线程永久阻塞。
- 超时控制:使用await(long timeout, TimeUnit unit)防止死锁。
- 资源释放:等待线程被中断时,需妥善处理资源(如关闭连接)。
二、CyclicBarrier
2.1 概述
一种同步辅助工具,「它允许一组线程全部等待彼此到达公共屏障点」。 「CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用.」
「CyclicBarrier支持可选的Runnable命令,该命令在每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。」
与CountDownLatch类似,但是它不是计数相减, 而是计数相加操作;功能类似;
- 执行一个线程, 加一个数,当执行的线程数达到指定的线程数的时候,触发一个操作;
- 集齐七个龙珠,可以召唤神龙, 或者理解成: 同学都到了, 咱们再上课
2.2 基本示例代码
package cn.tcmeta.thread;import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;/*** @author laoren* CyclicBarrier类似于CountDownLatch, 不同的是这个是做加法,相当于一个屏障.* 可以理解为: 集齐七颗龙珠,就可以召唤神龙了. 它的计数是相加,达到某个值之后, 触发后续操作;*/
public class CyclicBarrierDemo {public static void main(String[] args) {/*** 构造方法说明:* 参数说明:* parities: 在屏障到达之前必须调用 await 的线程数* barrierAction: 到达屏障点触发的动作.* public CyclicBarrier(int parties, Runnable barrierAction) {* if (parties <= 0) throw new IllegalArgumentException();* this.parties = parties;* this.count = parties;* this.barrierCommand = barrierAction;* }*/CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("已经集齐了七颗龙珠,神龙现世!"));for (int i = 0; i < 7; i++) {int count = i;new Thread(() ->{System.out.println(Thread.currentThread().getName() + " --> \t" + " 已经集齐了 " + count + " 龙珠了!");try {TimeUnit.MILLISECONDS.sleep(2000);// 未到达屏障点cyclicBarrier.await();}catch (Exception e){e.printStackTrace();}}, "线程 - " + i).start();}}
}
2.3 使用场景
2.3.1 并行计算分阶段处理
分布式计算中,每个线程处理数据分片,需等待所有线程完成当前阶段才能进入下一阶段(如机器学习模型的迭代训练)
class DataProcessor {final int THREAD_COUNT = 4;final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("当前阶段完成,开始下一阶段"));void process() {ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT);for (int i = 0; i < THREAD_COUNT; i++) {exec.submit(() -> {for (int phase = 1; phase <= 3; phase++) { // 3个处理阶段processPhase(phase); // 当前阶段计算barrier.await(); // 等待其他线程}});}exec.shutdown();}void processPhase(int phase) {// 模拟分片计算System.out.println(Thread.currentThread().getName() + " 完成阶段" + phase);}
}
2.3.2 多玩家游戏同步
游戏关卡中,所有玩家必须完成当前关卡才能同时进入下一关
package cn.tcmeta.thread;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;class GameServer {final int PLAYERS = 3;final CyclicBarrier levelBarrier = new CyclicBarrier(PLAYERS, () -> System.out.println("所有玩家通关!开始新关卡"));void startGame() {for (int i = 1; i <= PLAYERS; i++) {new Thread(() -> {while (true) {// completeLevel(); // 玩家完成当前关卡try {levelBarrier.await(); // 等待其他玩家} catch (InterruptedException | BrokenBarrierException e) {throw new RuntimeException(e);}}}, "玩家"+i).start();}}
}
2.3.3 性能测试压力生成
需要精确控制所有压测线程在同一毫秒发起请求
package cn.tcmeta.thread;import java.util.concurrent.CyclicBarrier;class StressTester {final int THREADS = 100;final CyclicBarrier startBarrier = new CyclicBarrier(THREADS);void test() throws Exception {for (int i = 0; i < THREADS; i++) {new Thread(() -> {awaitBarrier(); // 等待发令枪sendRequest(); // 发送API请求}).start();}}private void sendRequest() {System.out.println("Sending request...");}void awaitBarrier() {try {startBarrier.await(); // 所有线程在此同步} catch (Exception e) { /*...*/ }}public static void main(String[] args) throws Exception {new StressTester().test();}
}
2.3.4 遗传算法迭代
并行遗传算法中,每代种群需完成评估后才能进行交叉变异
class GeneticAlgorithm {final int POPULATION_SIZE = 50;final CyclicBarrier generationBarrier = new CyclicBarrier(POPULATION_SIZE);void evolve() {ExecutorService pool = Executors.newCachedThreadPool();for (int i = 0; i < POPULATION_SIZE; i++) {pool.execute(() -> {for (int gen = 0; gen < 1000; gen++) {evaluateFitness(); // 评估个体适应度generationBarrier.await();crossover(); // 等待后执行交叉操作}});}pool.shutdown();}
}
2.3.5 金融交易对账系统
银行每日需等待所有分行上传数据后执行全局对账
class ReconciliationSystem {final int BRANCHES = 20;final CyclicBarrier dailyBarrier = new CyclicBarrier(BRANCHES, this::runReconciliation);void startDailyJob() {for (String branch : getBranches()) {new Thread(() -> {uploadData(branch); // 分行上传数据dailyBarrier.await(); // 等待其他分行}).start();}}void runReconciliation() {System.out.println("开始全局对账..."); }
}
2.4 CyclicBarrier核心特性
2.5 最佳实践
最佳实践:
- 「在循环体内使用 CyclicBarrier 处理分阶段任务」
- 「栅栏动作中避免长时间阻塞(会延迟线程释放)」
- 「配合 ExecutorService 管理线程生命周期」
- 「使用 isBroken() 检测屏障状态并处理异常」
三、Semaphore
3.1 概述
3.1.1 核心概念
信号量, 信号灯; 计数信号量。 从概念上讲,信号量维护一组许可; 可以做限流使用;
- 可以控制同时运行的线程数, 「线程可以有好多个线程,但是它可以控制同时执行的线程个数」
- 类似于抢车位的概念或者是厕所抢坑的例子; - 当前有7辆车, 「七个操作线程」,车位有3个「信号量是3」
3.1.2 构造方法
// 可用的初始许可证数量
public Semaphore(int permits) {sync = new NonfairSync(permits);
}// 数量, 是否为公平锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3.1.3 重点方法
// 1. acquire(), 获取操作. 当一个线程调用`acquire`操作时, 它要么通过成功获取信号量「信号量加1操作」, 要么一直等待下去;下到有线程释放信息号或者超时.
// 2. release(), 释放操作,实际上会将信号量的值加1操作,然后唤醒等待的线程;
3.2 示例代码
3.2.1 共享资源互斥
package cn.tcmeta.thread;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;/*** @author laoren*/
public class SemaphoreDemo {public static void main(String[] args) {// 信号量,初始化为1, 表示只有一个线程可以操作.Semaphore semaphore = new Semaphore(1);new Thread(() ->{// 获取锁操作try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" + " 拿到了锁哦...");// 模拟一下业务操作try {TimeUnit.MILLISECONDS.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" + " 完成了业务哦..");} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 释放锁semaphore.release();}}, "王麻子").start();new Thread(() ->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" + " 拿到了锁哦...");// 模拟一下业务操作try {TimeUnit.MILLISECONDS.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" + " 完成了业务哦..");}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}, "肥七").start();}
}
3.2.2 并发线程数控制
场景:
- 目前有三个坑位「信号量数量为3」
- 有10个人抢这三个坑住「线程数量」
package cn.tcmeta.thread;import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class SemaphoreDemo2 {public static final int PERMITS = 3;public static final int PERSON_COUNT = 10;public static void main(String[] args) {// 定义信号量Semaphore semaphore = new Semaphore(PERMITS);for (int i = 0; i < PERSON_COUNT; i++) {new Thread(() ->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" + " 抢到了坑位了!");try {TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);System.out.println(Thread.currentThread().getName() + " --> \t" + " 啊啊啊,,舒服...释放坑位 ~~~");}catch (Exception e){e.printStackTrace();}}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}, "线程: " + i).start();}}
}
3.3 使用场景
Semaphore(信号量)是Java中用于控制并发线程访问资源数量的同步工具。
它维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问后释放许可证.
3.3.1 数据库连接池管理
限制同时使用的数据库连接数量,防止资源耗尽
public class ConnectionPool {private final Semaphore semaphore;private final List<Connection> connections = new ArrayList<>();public ConnectionPool(int poolSize) {semaphore = new Semaphore(poolSize, true); // 公平模式for (int i = 0; i < poolSize; i++) {connections.add(createConnection());}}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可证return getAvailableConnection();}public void releaseConnection(Connection conn) {returnConnection(conn);semaphore.release(); // 释放许可证}private synchronized Connection getAvailableConnection() {return connections.remove(0);}private synchronized void returnConnection(Connection conn) {connections.add(conn);}private Connection createConnection() {// 创建数据库连接return mock(Connection.class);}
}
3.3.2 限流器
控制API接口每秒最大请求数量
public class RateLimiter {private final Semaphore semaphore;private final int maxPermits;private final ScheduledExecutorService scheduler;public RateLimiter(int permitsPerSecond) {this.maxPermits = permitsPerSecond;semaphore = new Semaphore(permitsPerSecond);scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {int available = semaphore.availablePermits();if (available < maxPermits) {semaphore.release(maxPermits - available);}}, 0, 1, TimeUnit.SECONDS);}public boolean tryAcquire() {return semaphore.tryAcquire();}public void acquire() throws InterruptedException {semaphore.acquire();}
}
3.3.3 停车场管理系统
模拟停车场车位管理
public class CarPark {private final Semaphore parkingSlots;private final int totalSlots;private final Set<String> parkedCars = new HashSet<>();public CarPark(int totalSlots) {this.totalSlots = totalSlots;this.parkingSlots = new Semaphore(totalSlots, true); // 公平模式}public boolean parkCar(String carId) {if (parkingSlots.tryAcquire()) {synchronized (this) {parkedCars.add(carId);}System.out.println(carId + " parked. Available slots: " + parkingSlots.availablePermits());return true;}System.out.println(carId + " failed to park. No available slots.");return false;}public void leaveCar(String carId) {synchronized (this) {if (parkedCars.remove(carId)) {parkingSlots.release();System.out.println(carId + " left. Available slots: " + parkingSlots.availablePermits());}}}
}
3.3.4 打印机池管理
多线程共享有限打印机资源
public class PrinterPool {private final Semaphore semaphore;private final List<Printer> printers = new ArrayList<>();public PrinterPool(int printerCount) {semaphore = new Semaphore(printerCount);for (int i = 0; i < printerCount; i++) {printers.add(new Printer("Printer-" + (i+1)));}}public void printDocument(String document) throws InterruptedException {semaphore.acquire();Printer printer = null;try {printer = getAvailablePrinter();printer.print(document);} finally {if (printer != null) {returnPrinter(printer);}semaphore.release();}}private synchronized Printer getAvailablePrinter() {return printers.remove(0);}private synchronized void returnPrinter(Printer printer) {printers.add(printer);}
}
3.3.5服务调用限流
限制对第三方服务的并发调用数量
public class ExternalServiceInvoker {private final Semaphore semaphore;private final int maxConcurrentCalls;public ExternalServiceInvoker(int maxConcurrentCalls) {this.maxConcurrentCalls = maxConcurrentCalls;this.semaphore = new Semaphore(maxConcurrentCalls);}public String invokeService(String request) {if (!semaphore.tryAcquire()) {throw new ServiceOverloadException("Too many concurrent requests");}try {return callExternalService(request);} finally {semaphore.release();}}private String callExternalService(String request) {// 调用外部服务return "Response for: " + request;}
}
3.3.6 生产者-消费者模型(有界缓冲区)
使用Semaphore实现生产者-消费者模式
public class BoundedBuffer<E> {private final Semaphore availableItems;private final Semaphore availableSpaces;private final Queue<E> buffer = new LinkedList<>();private final int capacity;public BoundedBuffer(int capacity) {this.capacity = capacity;this.availableItems = new Semaphore(0);this.availableSpaces = new Semaphore(capacity);}public void put(E item) throws InterruptedException {availableSpaces.acquire(); // 等待空槽synchronized (this) {buffer.add(item);}availableItems.release(); // 增加可用项目}public E take() throws InterruptedException {availableItems.acquire(); // 等待可用项目E item;synchronized (this) {item = buffer.poll();}availableSpaces.release(); // 增加空槽return item;}
}
3.4 Semaphore关键特性总结
3.5 使用建议
- 资源保护:当需要保护有限资源时使用Semaphore
- 公平性考虑:在资源竞争激烈时使用公平模式防止线程饥饿
- 异常处理:确保在finally块中释放许可证,避免许可证泄漏
- 许可证数量:合理设置许可证数量,过多失去限制意义,过少影响性能
- 替代方案:对于简单计数场景,考虑使用CountDownLatch或CyclicBarrier
四、LockSupport
用于创建锁和其他同步类的基本线程阻塞基元。
4.1 重要方法
4.1.1 使用线程进入阻塞状态
public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);U.park(false, 0L);setBlocker(t, null);
}
4.1.2 唤醒某个线程
public static void unpark(Thread thread) {if (thread != null)U.unpark(thread);
}
4.2 示例代码
需求:
- 开启一条线程, 循环输出10个数;
- 当打印到第五个数时,线程阻塞住.
- 10s之后再继续打印其值;
package cn.tcmeta.thread;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;/*** @author laoren*/
public class LockSupportDemo {public static void main(String[] args) {Thread t1 = new Thread(() -> {for (int i = 0; i < 10; i++) {try {TimeUnit.MILLISECONDS.sleep(1000);System.out.println(Thread.currentThread().getName() + " --> \t" + " 当前值是: " + i);if (i == 5) {System.out.println(Thread.currentThread().getName() + " --> \t" + " 被阻塞了..等一会继续干活....");LockSupport.park(); // 阻塞当前线程}} catch (Exception e) {e.printStackTrace();}}}, "print-info-thread: ");t1.start(); // 启动打印的线程try {// 10秒钟睡眼TimeUnit.SECONDS.sleep(10);LockSupport.unpark(t1); // 唤醒t1线程,继续干活} catch (Exception e) {e.printStackTrace();}}
}
4.3 使用场景
LockSupport 是 Java 并发包中一个强大的线程阻塞工具,提供了比传统 wait/notify 更灵活、更底层的线程控制能力。
4.3.1 构建高级同步工具(如 AQS)
在实现自定义锁或同步器时,使用 LockSupport 作为底层阻塞机制
public class SimpleReentrantLock {private volatile Thread owner = null;private int lockCount = 0;private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();public void lock() {Thread current = Thread.currentThread();if (owner == current) {lockCount++;return;}waiters.add(current);while (owner != null || !waiters.peek().equals(current) ||!compareAndSetOwner(null, current)) {LockSupport.park(this); // 阻塞当前线程}waiters.remove();lockCount = 1;}public void unlock() {if (owner != Thread.currentThread()) {throw new IllegalMonitorStateException();}if (--lockCount == 0) {owner = null;// 唤醒队首等待线程Thread next = waiters.peek();if (next != null) {LockSupport.unpark(next);}}}private boolean compareAndSetOwner(Thread expect, Thread update) {// 原子更新ownerreturn UNSAFE.compareAndSwapObject(this, ownerOffset, expect, update);}// Unsafe 相关初始化代码省略...
}
4.3.2 中断敏感的阻塞操作
实现可响应中断的等待机制,避免 Thread.sleep() 的局限性
public class InterruptibleTask {private volatile boolean completed = false;public void doTask() {Thread worker = new Thread(() -> {// 模拟长时间任务try {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}completed = true;LockSupport.unpark(Thread.currentThread()); // 通知完成});worker.start();// 等待任务完成,可响应中断while (!completed) {LockSupport.park(this);if (Thread.interrupted()) {System.out.println("Task waiting interrupted!");worker.interrupt();break;}}}
}
4.3.3 定时阻塞与精确唤醒
需要精确控制线程阻塞时间的场景(如定时任务调度)
public class PrecisionScheduler {private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public void scheduleAt(Runnable task, long deadlineNanos) {Thread worker = new Thread(() -> {long now;while ((now = System.nanoTime()) < deadlineNanos) {long nanosToWait = deadlineNanos - now;if (nanosToWait > 0) {LockSupport.parkNanos(nanosToWait);}}task.run();});worker.start();}// 取消任务(提前唤醒)public void cancel(Thread worker) {LockSupport.unpark(worker);}
}
4.3.4 线程协作与屏障
实现自定义的线程协作机制(替代 CountDownLatch 或 CyclicBarrier)
public class CustomBarrier {private final int parties;private final AtomicInteger count = new AtomicInteger();private final Thread[] waiters;public CustomBarrier(int parties) {this.parties = parties;this.waiters = new Thread[parties];}public void await() {int index = count.getAndIncrement();if (index < parties - 1) {waiters[index] = Thread.currentThread();LockSupport.park(this); // 阻塞直到所有线程到达} else {// 最后一个线程唤醒所有等待线程for (int i = 0; i < parties - 1; i++) {LockSupport.unpark(waiters[i]);}count.set(0); // 重置屏障}}
}
4.3.5 高性能消息队列
实现无锁或低争用的生产者-消费者模型
public class LockSupportQueue<T> {private static class Node<T> {T item;volatile Node<T> next;}private volatile Node<T> head;private volatile Node<T> tail;private volatile Thread consumer;public LockSupportQueue() {Node<T> dummy = new Node<>();head = dummy;tail = dummy;}public void put(T item) {Node<T> node = new Node<>();node.item = item;Node<T> t = tail;tail = node;t.next = node;// 唤醒等待的消费者if (consumer != null) {LockSupport.unpark(consumer);consumer = null;}}public T take() throws InterruptedException {Node<T> h = head;Node<T> first = h.next;if (first != null) {head = first;return first.item;}// 队列为空,注册自己为消费者并阻塞consumer = Thread.currentThread();while (first == null) {LockSupport.park();if (Thread.interrupted()) {throw new InterruptedException();}h = head;first = h.next;}head = first;return first.item;}
}
4.3.6 死锁检测与恢复
实现带超时的资源获取,避免死锁
public class DeadlockAvoider {private final Lock resourceLock = new ReentrantLock();public boolean tryLockWithTimeout(long timeout, TimeUnit unit) {Thread current = Thread.currentThread();final long deadline = System.nanoTime() + unit.toNanos(timeout);// 尝试获取锁if (resourceLock.tryLock()) {return true;}// 设置超时线程Thread timeoutThread = new Thread(() -> {try {Thread.sleep(unit.toMillis(timeout));LockSupport.unpark(current); // 超时唤醒} catch (InterruptedException e) {Thread.currentThread().interrupt();}});timeoutThread.start();// 阻塞直到获取锁或超时while (System.nanoTime() < deadline) {if (resourceLock.tryLock()) {timeoutThread.interrupt();return true;}LockSupport.parkUntil(deadline);}return false;}
}
4.5 LockSupport核心优势
4.6 使用注意事项
- 许可不可累加:多次调用 unpark 不会累积许可(最多只有一个有效许可)
- 中断处理:调用 park 后线程被中断会立即返回,但不会清除中断状态
- 虚假唤醒:和 Object.wait() 类似,park 也可能出现虚假唤醒
- 同步配合:通常需要与其他同步机制(如 volatile 变量)配合使用
- 避免滥用:在普通业务代码中优先使用高级并发工具类
4.7 LockSupport vs Object.wait/notify
LockSupport 是构建 Java 并发框架的基石(如 AQS),在开发高性能并发工具时尤其有用。对于日常业务开发,优先考虑使用基于 LockSupport 构建的高级并发工具(如 ReentrantLock, CountDownLatch 等)。
五、Exchanger
5.1 概述
线程可以配对和交换元素对的同步点。 每个线程在进入exchange方法时呈现一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。 Exchanger 可以被视为SynchronousQueue的双向形式。 交换器可能在遗传算法和管道设计等应用中很有用。
5.2 基本示例
package cn.tcmeta.thread;import java.util.concurrent.Exchanger;public class ExchangerDemo {// 初始化Exchanger对象public static final Exchanger<String> EXCHANGER = new Exchanger<>();public static void main(String[] args) {new Thread(() ->{String s1 = "它们都老了吗?它们在哪里呀,幸运的是我,曾陪他们开放............";System.out.println(Thread.currentThread().getName() + " --> \t" + "交换之前: " + s1);try {s1 = EXCHANGER.exchange(s1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + " --> \t" + " 交换之后: " + s1);}, "那些花: ").start();new Thread(() ->{String s1 = "明天你是否会想起,你昨天写的日记............";System.out.println(Thread.currentThread().getName() + " --> \t" + "交换之前: " + s1);try {s1 = EXCHANGER.exchange(s1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + " --> \t" + " 交换之后: " + s1);}, "同桌的你: ").start();}
}
5.3 使用场景
Exchanger 是 Java 并发包中一个独特的同步工具,用于两个线程之间交换数据。它提供了一个同步点,在这个点上两个线程可以交换彼此的对象。
5.3.1 生产者-消费者缓冲区交换
双缓冲技术中,生产者和消费者交换缓冲区引用,避免数据复制开销
public class DoubleBufferProcessor {private final Exchanger<Buffer> exchanger = new Exchanger<>();private final ExecutorService executor = Executors.newFixedThreadPool(2);static class Buffer {byte[] data = new byte[1024];int size;void fill(InputStream in) throws IOException {size = in.read(data);}void process() {// 处理缓冲区数据System.out.println("Processing " + size + " bytes");}}public void start(InputStream input) {executor.submit(() -> producerTask(input));executor.submit(() -> consumerTask());}private void producerTask(InputStream input) {Buffer current = new Buffer();try {while (true) {current.fill(input); // 填充缓冲区current = exchanger.exchange(current); // 交换缓冲区}} catch (IOException | InterruptedException e) {Thread.currentThread().interrupt();}}private void consumerTask() {Buffer current = new Buffer();try {while (true) {current = exchanger.exchange(current); // 交换缓冲区current.process(); // 处理数据current.size = 0; // 清空缓冲区}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
5.3.2 遗传算法交叉操作
在并行遗传算法中,两个线程交换染色体进行交叉操作
public class GeneticAlgorithm {private final Exchanger<Chromosome> exchanger = new Exchanger<>();private final int populationSize;public GeneticAlgorithm(int populationSize) {this.populationSize = populationSize;}public void evolve() {ExecutorService pool = Executors.newFixedThreadPool(populationSize);for (int i = 0; i < populationSize; i += 2) {pool.submit(new Individual(i));pool.submit(new Individual(i + 1));}}class Individual implements Runnable {private Chromosome chromosome;Individual(int id) {this.chromosome = new Chromosome(id);}@Overridepublic void run() {try {chromosome.evaluateFitness();// 与配对的个体交换染色体Chromosome partner = exchanger.exchange(chromosome);// 执行交叉操作chromosome = chromosome.crossover(partner);chromosome.mutate();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Chromosome {// 染色体实现Chromosome crossover(Chromosome other) { return this; }void evaluateFitness() {}void mutate() {}}
}
5.3.3 网络协议握手交换
实现自定义协议中双方交换密钥或初始化向量
public class SecureChannel {private final Exchanger<byte[]> keyExchanger = new Exchanger<>();public void establishChannel(Socket socket1, Socket socket2) {new Thread(() -> partyTask(socket1)).start();new Thread(() -> partyTask(socket2)).start();}private void partyTask(Socket socket) {try {// 生成随机密钥byte[] myKey = generateSessionKey();// 发送公钥给对方OutputStream out = socket.getOutputStream();out.write(myKey);out.flush();// 接收对方的公钥InputStream in = socket.getInputStream();byte[] theirKey = new byte[myKey.length];in.read(theirKey);// 交换密钥并计算共享密钥byte[] sharedKey = keyExchanger.exchange(combineKeys(myKey, theirKey));// 使用共享密钥开始通信startEncryptedCommunication(socket, sharedKey);} catch (IOException | InterruptedException e) {handleError(e);}}
}
5.3.4 游戏玩家物品交易
在线游戏中两个玩家安全交换物品
public class PlayerTradingSystem {private final Exchanger<TradeOffer> exchanger = new Exchanger<>();public void trade(Player player1, Player player2) {new Thread(() -> playerTradeTask(player1, player2)).start();new Thread(() -> playerTradeTask(player2, player1)).start();}private void playerTradeTask(Player player, Player partner) {try {// 玩家选择要交易的物品TradeOffer myOffer = player.createTradeOffer();// 显示交易界面player.showTradeInterface(partner);// 等待双方确认if (player.confirmTrade()) {// 交换交易内容TradeOffer theirOffer = exchanger.exchange(myOffer);// 执行交易player.completeTrade(theirOffer);} else {player.cancelTrade();}} catch (InterruptedException e) {player.cancelTrade();Thread.currentThread().interrupt();}}static class TradeOffer {List<Item> items;int gold;}
}
5.3.5 测试工具中的请求-响应交换
在性能测试工具中,一个线程发送请求,另一个线程验证响应
public class RequestResponseValidator {private final Exchanger<HttpResponse> exchanger = new Exchanger<>();public void testEndpoint(String url) {new Thread(() -> requesterTask(url)).start();new Thread(() -> validatorTask()).start();}private void requesterTask(String url) {try {HttpClient client = HttpClient.newHttpClient();HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());// 交换响应给验证器exchanger.exchange(response);} catch (IOException | InterruptedException | URISyntaxException e) {handleError(e);}}private void validatorTask() {try {// 获取响应并验证HttpResponse<?> response = exchanger.exchange(null);validateResponse(response);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
5.3.6 数据流水线阶段交换
在数据处理流水线中,两个相邻阶段交换处理单元
public class DataPipeline {private final Exchanger<DataBatch> exchanger = new Exchanger<>();public void processData(DataSource source) {new Thread(() -> stage1(source)).start();new Thread(() -> stage2()).start();}private void stage1(DataSource source) {DataBatch batch = new DataBatch();try {while (source.hasMore()) {// 提取数据batch.extract(source);// 预处理batch.preprocess();// 交换给下一阶段batch = exchanger.exchange(batch);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void stage2() {DataBatch batch = new DataBatch();try {while (true) {// 获取处理批次batch = exchanger.exchange(batch);// 分析数据batch.analyze();// 存储结果batch.storeResults();// 清空批次batch.clear();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
5.4 Exchanger核心特性总结
5.5 使用注意事项
- 仅限两个线程:Exchanger 严格用于两个线程间的交换,多线程行为未定义
- 死锁风险:如果一个线程未调用 exchange,另一个线程会永久阻塞
- 对象复用:交换后对象会被对方线程修改,需要适当同步或使用不可变对象
- 性能考虑:适合中低频交换场景,高频交换可能成为瓶颈
- 超时设置:生产环境中应始终使用带超时的 exchange 方法
- 资源清理:确保线程中断时正确处理资源释放
5.6 典型应用场景对比
Exchanger 是 Java 并发工具包中的一颗"隐藏宝石",特别适合需要精确控制两个线程间数据交换的场景。在正确的场景下使用 Exchanger 可以简化设计并提高性能,但需要注意其严格的线程配对要求。
六、没有了
学习快乐!!!