当前位置: 首页 > news >正文

JUC常用线程辅助类详解

一、CountDownLatch

1.1 概述

倒计时门闩, 当执行任务的线程数量到达为0的时候,触发.

  • 可以理解为: 一个教室有多个人,直到所有人走光之后, 班长锁门, 这里需要等待所有人都走完.

一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch使用给定的计数进行初始化。 由于调用了countDown方法, await方法会阻塞直到当前计数达到零,然后释放所有等待线程,并且任何后续的await调用都会立即返回。 这是一种一次性现象——计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。

CountDownLatch是一种通用的同步工具,可用于多种目的。 计数为 1 的CountDownLatch用作简单的开/关锁存器或门:调用await所有线程在门处等待,直到它被调用countDown的线程countDown 。 初始化为N的CountDownLatch可用于使一个线程等待,直到N 个线程完成某个操作,或者某个操作已完成 N 次.
在这里插入图片描述

1.2 示例代码

package cn.tcmeta.thread;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** CountDownLatchDemo*/
public class CountDownLatchDemo {public static void main(String[] args) {// 等待的线程数量为5CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {new Thread(() ->{try {System.out.println(Thread.currentThread().getName() + " --> \t" +  " 正在执行!");TimeUnit.MILLISECONDS.sleep(5000);System.out.println(Thread.currentThread().getName() + " --> \t" +  "执行成功!" );// 当任务执行完成之后,要进行减1操作countDownLatch.countDown();}catch (Exception e){e.printStackTrace();}}, "线程:  " + i).start();}try {// 当计数为0时,停止阻塞.相当于打开门闩.执行后续的逻辑countDownLatch.await();}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" +  " 所有任务执行完成!");}
}

程序执行结果:
在这里插入图片描述

1.3 使用场景

1.3.1 并行任务启动后等待所有任务完成

场景:主线程需要启动多个子线程并行处理任务(如批量处理数据),并在所有子线程完成后进行结果汇总

CountDownLatch doneLatch = new CountDownLatch(TASK_COUNT);for (int i = 0; i < TASK_COUNT; i++) {new Thread(() -> {try {// 执行子任务(如调用外部API、计算等)} finally {doneLatch.countDown(); // 任务完成时计数器减1}}).start();
}doneLatch.await(); // 主线程阻塞,直到所有子线程完成
System.out.println("所有任务处理完毕,开始汇总结果");

1.3.2 服务启动时依赖资源初始化

场景:微服务启动时需要加载多个独立资源(如数据库连接、缓存预热、配置文件加载),主线程需等待所有资源初始化完成后才启动服务。

CountDownLatch initLatch = new CountDownLatch(3); // 3个依赖资源// 线程1:初始化数据库
new Thread(() -> { initDatabase(); initLatch.countDown(); }).start();// 线程2:预热缓存
new Thread(() -> { warmUpCache(); initLatch.countDown(); }).start();// 线程3:加载配置文件
new Thread(() -> { loadConfig(); initLatch.countDown(); }).start();initLatch.await(); // 等待所有资源初始化
startServer(); // 启动服务

1.3.3 并发性能测试

模拟高并发请求,需要所有请求线程在同一时刻发起操作(如压力测试接口)。

CountDownLatch startLatch = new CountDownLatch(1); // 发令枪
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); // 结束标识for (int i = 0; i < THREAD_COUNT; i++) {new Thread(() -> {try {startLatch.await(); // 所有线程在此等待callTestAPI();      // 同时发起请求} finally {endLatch.countDown();}}).start();
}Thread.sleep(500); // 确保所有线程就绪
startLatch.countDown(); // 统一放行
endLatch.await();      // 等待所有请求完成
generateReport();       // 生成测试报告

1.3.4 多阶段任务协作

游戏服务器中,玩家需全部准备就绪后游戏才开始,且需等待所有玩家完成第一关卡后再进入下一关。

// 等待所有玩家准备
CountDownLatch readyLatch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> player.prepare(() -> readyLatch.countDown())
);
readyLatch.await();
startGame();// 等待所有玩家完成第一关
CountDownLatch level1Latch = new CountDownLatch(PLAYER_COUNT);
players.forEach(player -> player.finishLevel1(() -> level1Latch.countDown())
);
level1Latch.await();
startLevel2();

1.3.5 分布式任务分片同步

[!tip]

分布式计算中,主节点将任务分发给多个工作节点,需等待所有节点返回结果后再合并。

List<WorkerNode> nodes = getWorkerNodes();
CountDownLatch resultLatch = new CountDownLatch(nodes.size());for (WorkerNode node : nodes) {node.executeTask(dataShard, () -> {// 返回分片处理结果resultLatch.countDown();});
}resultLatch.await();
mergeResults(); // 合并所有分片结果

1.4 关键注意事项

  • 计数器不可重置:CountDownLatch的计数器归零后无法重置,如需重复使用,改用CyclicBarrier。
  • 异常处理:确保countDown()在finally块中调用,避免线程异常导致主线程永久阻塞。
  • 超时控制:使用await(long timeout, TimeUnit unit)防止死锁。
  • 资源释放:等待线程被中断时,需妥善处理资源(如关闭连接)。

二、CyclicBarrier

2.1 概述

一种同步辅助工具,「它允许一组线程全部等待彼此到达公共屏障点」「CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用.」

「CyclicBarrier支持可选的Runnable命令,该命令在每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。」

与CountDownLatch类似,但是它不是计数相减, 而是计数相加操作;功能类似;

  • 执行一个线程, 加一个数,当执行的线程数达到指定的线程数的时候,触发一个操作;
  • 集齐七个龙珠,可以召唤神龙, 或者理解成: 同学都到了, 咱们再上课

2.2 基本示例代码

package cn.tcmeta.thread;import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;/*** @author laoren*  CyclicBarrier类似于CountDownLatch, 不同的是这个是做加法,相当于一个屏障.*          可以理解为: 集齐七颗龙珠,就可以召唤神龙了. 它的计数是相加,达到某个值之后, 触发后续操作;*/
public class CyclicBarrierDemo {public static void main(String[] args) {/*** 构造方法说明:*      参数说明:*              parities: 在屏障到达之前必须调用 await 的线程数*              barrierAction: 到达屏障点触发的动作.*     public CyclicBarrier(int parties, Runnable barrierAction) {*         if (parties <= 0) throw new IllegalArgumentException();*         this.parties = parties;*         this.count = parties;*         this.barrierCommand = barrierAction;*     }*/CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("已经集齐了七颗龙珠,神龙现世!"));for (int i = 0; i < 7; i++) {int count = i;new Thread(() ->{System.out.println(Thread.currentThread().getName() + " --> \t" +  " 已经集齐了 " + count + " 龙珠了!");try {TimeUnit.MILLISECONDS.sleep(2000);// 未到达屏障点cyclicBarrier.await();}catch (Exception e){e.printStackTrace();}}, "线程 - " + i).start();}}
}

在这里插入图片描述

2.3 使用场景

2.3.1 并行计算分阶段处理

分布式计算中,每个线程处理数据分片,需等待所有线程完成当前阶段才能进入下一阶段(如机器学习模型的迭代训练)

class DataProcessor {final int THREAD_COUNT = 4;final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("当前阶段完成,开始下一阶段"));void process() {ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT);for (int i = 0; i < THREAD_COUNT; i++) {exec.submit(() -> {for (int phase = 1; phase <= 3; phase++) {  // 3个处理阶段processPhase(phase);  // 当前阶段计算barrier.await();     // 等待其他线程}});}exec.shutdown();}void processPhase(int phase) {// 模拟分片计算System.out.println(Thread.currentThread().getName() + " 完成阶段" + phase);}
}

在这里插入图片描述

2.3.2 多玩家游戏同步

游戏关卡中,所有玩家必须完成当前关卡才能同时进入下一关

package cn.tcmeta.thread;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;class GameServer {final int PLAYERS = 3;final CyclicBarrier levelBarrier = new CyclicBarrier(PLAYERS, () -> System.out.println("所有玩家通关!开始新关卡"));void startGame() {for (int i = 1; i <= PLAYERS; i++) {new Thread(() -> {while (true) {// completeLevel();  // 玩家完成当前关卡try {levelBarrier.await();  // 等待其他玩家} catch (InterruptedException | BrokenBarrierException e) {throw new RuntimeException(e);}}}, "玩家"+i).start();}}
}

2.3.3 性能测试压力生成

需要精确控制所有压测线程在同一毫秒发起请求

package cn.tcmeta.thread;import java.util.concurrent.CyclicBarrier;class StressTester {final int THREADS = 100;final CyclicBarrier startBarrier = new CyclicBarrier(THREADS);void test() throws Exception {for (int i = 0; i < THREADS; i++) {new Thread(() -> {awaitBarrier();    // 等待发令枪sendRequest();     // 发送API请求}).start();}}private void sendRequest() {System.out.println("Sending request...");}void awaitBarrier() {try {startBarrier.await();  // 所有线程在此同步} catch (Exception e) { /*...*/ }}public static void main(String[] args) throws Exception {new StressTester().test();}
}

在这里插入图片描述

2.3.4 遗传算法迭代

并行遗传算法中,每代种群需完成评估后才能进行交叉变异

class GeneticAlgorithm {final int POPULATION_SIZE = 50;final CyclicBarrier generationBarrier = new CyclicBarrier(POPULATION_SIZE);void evolve() {ExecutorService pool = Executors.newCachedThreadPool();for (int i = 0; i < POPULATION_SIZE; i++) {pool.execute(() -> {for (int gen = 0; gen < 1000; gen++) {evaluateFitness();  // 评估个体适应度generationBarrier.await();crossover();        // 等待后执行交叉操作}});}pool.shutdown();}
}

2.3.5 金融交易对账系统

银行每日需等待所有分行上传数据后执行全局对账

class ReconciliationSystem {final int BRANCHES = 20;final CyclicBarrier dailyBarrier = new CyclicBarrier(BRANCHES, this::runReconciliation);void startDailyJob() {for (String branch : getBranches()) {new Thread(() -> {uploadData(branch);    // 分行上传数据dailyBarrier.await();  // 等待其他分行}).start();}}void runReconciliation() {System.out.println("开始全局对账..."); }
}

2.4 CyclicBarrier核心特性

在这里插入图片描述

2.5 最佳实践

最佳实践:

  • 「在循环体内使用 CyclicBarrier 处理分阶段任务」
  • 「栅栏动作中避免长时间阻塞(会延迟线程释放)」
  • 「配合 ExecutorService 管理线程生命周期」
  • 「使用 isBroken() 检测屏障状态并处理异常」

三、Semaphore

3.1 概述

3.1.1 核心概念

信号量, 信号灯; 计数信号量。 从概念上讲,信号量维护一组许可; 可以做限流使用;

  • 可以控制同时运行的线程数, 「线程可以有好多个线程,但是它可以控制同时执行的线程个数」
  • 类似于抢车位的概念或者是厕所抢坑的例子; - 当前有7辆车, 「七个操作线程」,车位有3个「信号量是3」

3.1.2 构造方法

// 可用的初始许可证数量
public Semaphore(int permits) {sync = new NonfairSync(permits);
}// 数量, 是否为公平锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

3.1.3 重点方法

// 1. acquire(), 获取操作. 当一个线程调用`acquire`操作时, 它要么通过成功获取信号量「信号量加1操作」, 要么一直等待下去;下到有线程释放信息号或者超时. 
// 2. release(), 释放操作,实际上会将信号量的值加1操作,然后唤醒等待的线程;

3.2 示例代码

3.2.1 共享资源互斥

package cn.tcmeta.thread;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;/*** @author laoren*/
public class SemaphoreDemo {public static void main(String[] args) {// 信号量,初始化为1, 表示只有一个线程可以操作.Semaphore semaphore = new Semaphore(1);new Thread(() ->{// 获取锁操作try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" +  " 拿到了锁哦...");// 模拟一下业务操作try {TimeUnit.MILLISECONDS.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" +  " 完成了业务哦..");} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 释放锁semaphore.release();}}, "王麻子").start();new Thread(() ->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" +  " 拿到了锁哦...");// 模拟一下业务操作try {TimeUnit.MILLISECONDS.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " --> \t" +  " 完成了业务哦..");}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}, "肥七").start();}
}

在这里插入图片描述

3.2.2 并发线程数控制

场景:

  • 目前有三个坑位「信号量数量为3」
  • 有10个人抢这三个坑住「线程数量」
package cn.tcmeta.thread;import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class SemaphoreDemo2 {public static final int PERMITS = 3;public static final int PERSON_COUNT = 10;public static void main(String[] args) {// 定义信号量Semaphore semaphore = new Semaphore(PERMITS);for (int i = 0; i < PERSON_COUNT; i++) {new Thread(() ->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " --> \t" +  " 抢到了坑位了!");try {TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);System.out.println(Thread.currentThread().getName() + " --> \t" +  " 啊啊啊,,舒服...释放坑位 ~~~");}catch (Exception e){e.printStackTrace();}}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}, "线程: " + i).start();}}
}

在这里插入图片描述

3.3 使用场景

  • Semaphore(信号量)是Java中用于控制并发线程访问资源数量的同步工具。

  • 它维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问后释放许可证.

3.3.1 数据库连接池管理

限制同时使用的数据库连接数量,防止资源耗尽

public class ConnectionPool {private final Semaphore semaphore;private final List<Connection> connections = new ArrayList<>();public ConnectionPool(int poolSize) {semaphore = new Semaphore(poolSize, true); // 公平模式for (int i = 0; i < poolSize; i++) {connections.add(createConnection());}}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可证return getAvailableConnection();}public void releaseConnection(Connection conn) {returnConnection(conn);semaphore.release(); // 释放许可证}private synchronized Connection getAvailableConnection() {return connections.remove(0);}private synchronized void returnConnection(Connection conn) {connections.add(conn);}private Connection createConnection() {// 创建数据库连接return mock(Connection.class);}
}

在这里插入图片描述

3.3.2 限流器

控制API接口每秒最大请求数量

public class RateLimiter {private final Semaphore semaphore;private final int maxPermits;private final ScheduledExecutorService scheduler;public RateLimiter(int permitsPerSecond) {this.maxPermits = permitsPerSecond;semaphore = new Semaphore(permitsPerSecond);scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {int available = semaphore.availablePermits();if (available < maxPermits) {semaphore.release(maxPermits - available);}}, 0, 1, TimeUnit.SECONDS);}public boolean tryAcquire() {return semaphore.tryAcquire();}public void acquire() throws InterruptedException {semaphore.acquire();}
}

3.3.3 停车场管理系统

模拟停车场车位管理

public class CarPark {private final Semaphore parkingSlots;private final int totalSlots;private final Set<String> parkedCars = new HashSet<>();public CarPark(int totalSlots) {this.totalSlots = totalSlots;this.parkingSlots = new Semaphore(totalSlots, true); // 公平模式}public boolean parkCar(String carId) {if (parkingSlots.tryAcquire()) {synchronized (this) {parkedCars.add(carId);}System.out.println(carId + " parked. Available slots: " + parkingSlots.availablePermits());return true;}System.out.println(carId + " failed to park. No available slots.");return false;}public void leaveCar(String carId) {synchronized (this) {if (parkedCars.remove(carId)) {parkingSlots.release();System.out.println(carId + " left. Available slots: " + parkingSlots.availablePermits());}}}
}

3.3.4 打印机池管理

多线程共享有限打印机资源

public class PrinterPool {private final Semaphore semaphore;private final List<Printer> printers = new ArrayList<>();public PrinterPool(int printerCount) {semaphore = new Semaphore(printerCount);for (int i = 0; i < printerCount; i++) {printers.add(new Printer("Printer-" + (i+1)));}}public void printDocument(String document) throws InterruptedException {semaphore.acquire();Printer printer = null;try {printer = getAvailablePrinter();printer.print(document);} finally {if (printer != null) {returnPrinter(printer);}semaphore.release();}}private synchronized Printer getAvailablePrinter() {return printers.remove(0);}private synchronized void returnPrinter(Printer printer) {printers.add(printer);}
}

在这里插入图片描述

3.3.5服务调用限流

限制对第三方服务的并发调用数量

public class ExternalServiceInvoker {private final Semaphore semaphore;private final int maxConcurrentCalls;public ExternalServiceInvoker(int maxConcurrentCalls) {this.maxConcurrentCalls = maxConcurrentCalls;this.semaphore = new Semaphore(maxConcurrentCalls);}public String invokeService(String request) {if (!semaphore.tryAcquire()) {throw new ServiceOverloadException("Too many concurrent requests");}try {return callExternalService(request);} finally {semaphore.release();}}private String callExternalService(String request) {// 调用外部服务return "Response for: " + request;}
}

3.3.6 生产者-消费者模型(有界缓冲区)

使用Semaphore实现生产者-消费者模式

public class BoundedBuffer<E> {private final Semaphore availableItems;private final Semaphore availableSpaces;private final Queue<E> buffer = new LinkedList<>();private final int capacity;public BoundedBuffer(int capacity) {this.capacity = capacity;this.availableItems = new Semaphore(0);this.availableSpaces = new Semaphore(capacity);}public void put(E item) throws InterruptedException {availableSpaces.acquire(); // 等待空槽synchronized (this) {buffer.add(item);}availableItems.release(); // 增加可用项目}public E take() throws InterruptedException {availableItems.acquire(); // 等待可用项目E item;synchronized (this) {item = buffer.poll();}availableSpaces.release(); // 增加空槽return item;}
}

在这里插入图片描述

3.4 Semaphore关键特性总结

在这里插入图片描述

3.5 使用建议

  • 资源保护:当需要保护有限资源时使用Semaphore
  • 公平性考虑:在资源竞争激烈时使用公平模式防止线程饥饿
  • 异常处理:确保在finally块中释放许可证,避免许可证泄漏
  • 许可证数量:合理设置许可证数量,过多失去限制意义,过少影响性能
  • 替代方案:对于简单计数场景,考虑使用CountDownLatch或CyclicBarrier

四、LockSupport

用于创建锁和其他同步类的基本线程阻塞基元。

4.1 重要方法

4.1.1 使用线程进入阻塞状态

public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);U.park(false, 0L);setBlocker(t, null);
}

4.1.2 唤醒某个线程

public static void unpark(Thread thread) {if (thread != null)U.unpark(thread);
}

4.2 示例代码

需求:

  • 开启一条线程, 循环输出10个数;
  • 当打印到第五个数时,线程阻塞住.
  • 10s之后再继续打印其值;
package cn.tcmeta.thread;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;/*** @author laoren*/
public class LockSupportDemo {public static void main(String[] args) {Thread t1 = new Thread(() -> {for (int i = 0; i < 10; i++) {try {TimeUnit.MILLISECONDS.sleep(1000);System.out.println(Thread.currentThread().getName() + " --> \t" + " 当前值是: " + i);if (i == 5) {System.out.println(Thread.currentThread().getName() + " --> \t" +  " 被阻塞了..等一会继续干活....");LockSupport.park(); // 阻塞当前线程}} catch (Exception e) {e.printStackTrace();}}}, "print-info-thread: ");t1.start(); // 启动打印的线程try {// 10秒钟睡眼TimeUnit.SECONDS.sleep(10);LockSupport.unpark(t1); // 唤醒t1线程,继续干活} catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

4.3 使用场景

LockSupport 是 Java 并发包中一个强大的线程阻塞工具,提供了比传统 wait/notify 更灵活、更底层的线程控制能力。

4.3.1 构建高级同步工具(如 AQS)

在实现自定义锁或同步器时,使用 LockSupport 作为底层阻塞机制

public class SimpleReentrantLock {private volatile Thread owner = null;private int lockCount = 0;private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();public void lock() {Thread current = Thread.currentThread();if (owner == current) {lockCount++;return;}waiters.add(current);while (owner != null || !waiters.peek().equals(current) ||!compareAndSetOwner(null, current)) {LockSupport.park(this); // 阻塞当前线程}waiters.remove();lockCount = 1;}public void unlock() {if (owner != Thread.currentThread()) {throw new IllegalMonitorStateException();}if (--lockCount == 0) {owner = null;// 唤醒队首等待线程Thread next = waiters.peek();if (next != null) {LockSupport.unpark(next);}}}private boolean compareAndSetOwner(Thread expect, Thread update) {// 原子更新ownerreturn UNSAFE.compareAndSwapObject(this, ownerOffset, expect, update);}// Unsafe 相关初始化代码省略...
}

在这里插入图片描述

4.3.2 中断敏感的阻塞操作

实现可响应中断的等待机制,避免 Thread.sleep() 的局限性

public class InterruptibleTask {private volatile boolean completed = false;public void doTask() {Thread worker = new Thread(() -> {// 模拟长时间任务try {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}completed = true;LockSupport.unpark(Thread.currentThread()); // 通知完成});worker.start();// 等待任务完成,可响应中断while (!completed) {LockSupport.park(this);if (Thread.interrupted()) {System.out.println("Task waiting interrupted!");worker.interrupt();break;}}}
}

4.3.3 定时阻塞与精确唤醒

需要精确控制线程阻塞时间的场景(如定时任务调度)

public class PrecisionScheduler {private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public void scheduleAt(Runnable task, long deadlineNanos) {Thread worker = new Thread(() -> {long now;while ((now = System.nanoTime()) < deadlineNanos) {long nanosToWait = deadlineNanos - now;if (nanosToWait > 0) {LockSupport.parkNanos(nanosToWait);}}task.run();});worker.start();}// 取消任务(提前唤醒)public void cancel(Thread worker) {LockSupport.unpark(worker);}
}

4.3.4 线程协作与屏障

实现自定义的线程协作机制(替代 CountDownLatch 或 CyclicBarrier)

public class CustomBarrier {private final int parties;private final AtomicInteger count = new AtomicInteger();private final Thread[] waiters;public CustomBarrier(int parties) {this.parties = parties;this.waiters = new Thread[parties];}public void await() {int index = count.getAndIncrement();if (index < parties - 1) {waiters[index] = Thread.currentThread();LockSupport.park(this); // 阻塞直到所有线程到达} else {// 最后一个线程唤醒所有等待线程for (int i = 0; i < parties - 1; i++) {LockSupport.unpark(waiters[i]);}count.set(0); // 重置屏障}}
}

在这里插入图片描述

4.3.5 高性能消息队列

实现无锁或低争用的生产者-消费者模型

public class LockSupportQueue<T> {private static class Node<T> {T item;volatile Node<T> next;}private volatile Node<T> head;private volatile Node<T> tail;private volatile Thread consumer;public LockSupportQueue() {Node<T> dummy = new Node<>();head = dummy;tail = dummy;}public void put(T item) {Node<T> node = new Node<>();node.item = item;Node<T> t = tail;tail = node;t.next = node;// 唤醒等待的消费者if (consumer != null) {LockSupport.unpark(consumer);consumer = null;}}public T take() throws InterruptedException {Node<T> h = head;Node<T> first = h.next;if (first != null) {head = first;return first.item;}// 队列为空,注册自己为消费者并阻塞consumer = Thread.currentThread();while (first == null) {LockSupport.park();if (Thread.interrupted()) {throw new InterruptedException();}h = head;first = h.next;}head = first;return first.item;}
}

4.3.6 死锁检测与恢复

实现带超时的资源获取,避免死锁

public class DeadlockAvoider {private final Lock resourceLock = new ReentrantLock();public boolean tryLockWithTimeout(long timeout, TimeUnit unit) {Thread current = Thread.currentThread();final long deadline = System.nanoTime() + unit.toNanos(timeout);// 尝试获取锁if (resourceLock.tryLock()) {return true;}// 设置超时线程Thread timeoutThread = new Thread(() -> {try {Thread.sleep(unit.toMillis(timeout));LockSupport.unpark(current); // 超时唤醒} catch (InterruptedException e) {Thread.currentThread().interrupt();}});timeoutThread.start();// 阻塞直到获取锁或超时while (System.nanoTime() < deadline) {if (resourceLock.tryLock()) {timeoutThread.interrupt();return true;}LockSupport.parkUntil(deadline);}return false;}
}

4.5 LockSupport核心优势

在这里插入图片描述

4.6 使用注意事项

  • 许可不可累加:多次调用 unpark 不会累积许可(最多只有一个有效许可)
  • 中断处理:调用 park 后线程被中断会立即返回,但不会清除中断状态
  • 虚假唤醒:和 Object.wait() 类似,park 也可能出现虚假唤醒
  • 同步配合:通常需要与其他同步机制(如 volatile 变量)配合使用
  • 避免滥用:在普通业务代码中优先使用高级并发工具类

4.7 LockSupport vs Object.wait/notify

在这里插入图片描述

LockSupport 是构建 Java 并发框架的基石(如 AQS),在开发高性能并发工具时尤其有用。对于日常业务开发,优先考虑使用基于 LockSupport 构建的高级并发工具(如 ReentrantLock, CountDownLatch 等)。

五、Exchanger

5.1 概述

线程可以配对和交换元素对的同步点。 每个线程在进入exchange方法时呈现一些对象,与伙伴线程匹配,并在返回时接收其伙伴的对象。 Exchanger 可以被视为SynchronousQueue的双向形式。 交换器可能在遗传算法和管道设计等应用中很有用。

5.2 基本示例

package cn.tcmeta.thread;import java.util.concurrent.Exchanger;public class ExchangerDemo {// 初始化Exchanger对象public static final Exchanger<String> EXCHANGER = new Exchanger<>();public static void main(String[] args) {new Thread(() ->{String s1 = "它们都老了吗?它们在哪里呀,幸运的是我,曾陪他们开放............";System.out.println(Thread.currentThread().getName() + " --> \t" +  "交换之前: " + s1);try {s1 = EXCHANGER.exchange(s1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + " --> \t" +  " 交换之后: " + s1);}, "那些花: ").start();new Thread(() ->{String s1 = "明天你是否会想起,你昨天写的日记............";System.out.println(Thread.currentThread().getName() + " --> \t" +  "交换之前: " + s1);try {s1 = EXCHANGER.exchange(s1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + " --> \t" +  " 交换之后: " + s1);}, "同桌的你: ").start();}
}

在这里插入图片描述

5.3 使用场景

Exchanger 是 Java 并发包中一个独特的同步工具,用于两个线程之间交换数据。它提供了一个同步点,在这个点上两个线程可以交换彼此的对象。

5.3.1 生产者-消费者缓冲区交换

双缓冲技术中,生产者和消费者交换缓冲区引用,避免数据复制开销

public class DoubleBufferProcessor {private final Exchanger<Buffer> exchanger = new Exchanger<>();private final ExecutorService executor = Executors.newFixedThreadPool(2);static class Buffer {byte[] data = new byte[1024];int size;void fill(InputStream in) throws IOException {size = in.read(data);}void process() {// 处理缓冲区数据System.out.println("Processing " + size + " bytes");}}public void start(InputStream input) {executor.submit(() -> producerTask(input));executor.submit(() -> consumerTask());}private void producerTask(InputStream input) {Buffer current = new Buffer();try {while (true) {current.fill(input);  // 填充缓冲区current = exchanger.exchange(current);  // 交换缓冲区}} catch (IOException | InterruptedException e) {Thread.currentThread().interrupt();}}private void consumerTask() {Buffer current = new Buffer();try {while (true) {current = exchanger.exchange(current);  // 交换缓冲区current.process();  // 处理数据current.size = 0;  // 清空缓冲区}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

在这里插入图片描述

5.3.2 遗传算法交叉操作

在并行遗传算法中,两个线程交换染色体进行交叉操作

public class GeneticAlgorithm {private final Exchanger<Chromosome> exchanger = new Exchanger<>();private final int populationSize;public GeneticAlgorithm(int populationSize) {this.populationSize = populationSize;}public void evolve() {ExecutorService pool = Executors.newFixedThreadPool(populationSize);for (int i = 0; i < populationSize; i += 2) {pool.submit(new Individual(i));pool.submit(new Individual(i + 1));}}class Individual implements Runnable {private Chromosome chromosome;Individual(int id) {this.chromosome = new Chromosome(id);}@Overridepublic void run() {try {chromosome.evaluateFitness();// 与配对的个体交换染色体Chromosome partner = exchanger.exchange(chromosome);// 执行交叉操作chromosome = chromosome.crossover(partner);chromosome.mutate();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Chromosome {// 染色体实现Chromosome crossover(Chromosome other) { return this; }void evaluateFitness() {}void mutate() {}}
}

5.3.3 网络协议握手交换

实现自定义协议中双方交换密钥或初始化向量

public class SecureChannel {private final Exchanger<byte[]> keyExchanger = new Exchanger<>();public void establishChannel(Socket socket1, Socket socket2) {new Thread(() -> partyTask(socket1)).start();new Thread(() -> partyTask(socket2)).start();}private void partyTask(Socket socket) {try {// 生成随机密钥byte[] myKey = generateSessionKey();// 发送公钥给对方OutputStream out = socket.getOutputStream();out.write(myKey);out.flush();// 接收对方的公钥InputStream in = socket.getInputStream();byte[] theirKey = new byte[myKey.length];in.read(theirKey);// 交换密钥并计算共享密钥byte[] sharedKey = keyExchanger.exchange(combineKeys(myKey, theirKey));// 使用共享密钥开始通信startEncryptedCommunication(socket, sharedKey);} catch (IOException | InterruptedException e) {handleError(e);}}
}

5.3.4 游戏玩家物品交易

在线游戏中两个玩家安全交换物品

public class PlayerTradingSystem {private final Exchanger<TradeOffer> exchanger = new Exchanger<>();public void trade(Player player1, Player player2) {new Thread(() -> playerTradeTask(player1, player2)).start();new Thread(() -> playerTradeTask(player2, player1)).start();}private void playerTradeTask(Player player, Player partner) {try {// 玩家选择要交易的物品TradeOffer myOffer = player.createTradeOffer();// 显示交易界面player.showTradeInterface(partner);// 等待双方确认if (player.confirmTrade()) {// 交换交易内容TradeOffer theirOffer = exchanger.exchange(myOffer);// 执行交易player.completeTrade(theirOffer);} else {player.cancelTrade();}} catch (InterruptedException e) {player.cancelTrade();Thread.currentThread().interrupt();}}static class TradeOffer {List<Item> items;int gold;}
}

在这里插入图片描述

5.3.5 测试工具中的请求-响应交换

在性能测试工具中,一个线程发送请求,另一个线程验证响应

public class RequestResponseValidator {private final Exchanger<HttpResponse> exchanger = new Exchanger<>();public void testEndpoint(String url) {new Thread(() -> requesterTask(url)).start();new Thread(() -> validatorTask()).start();}private void requesterTask(String url) {try {HttpClient client = HttpClient.newHttpClient();HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());// 交换响应给验证器exchanger.exchange(response);} catch (IOException | InterruptedException | URISyntaxException e) {handleError(e);}}private void validatorTask() {try {// 获取响应并验证HttpResponse<?> response = exchanger.exchange(null);validateResponse(response);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

5.3.6 数据流水线阶段交换

在数据处理流水线中,两个相邻阶段交换处理单元

public class DataPipeline {private final Exchanger<DataBatch> exchanger = new Exchanger<>();public void processData(DataSource source) {new Thread(() -> stage1(source)).start();new Thread(() -> stage2()).start();}private void stage1(DataSource source) {DataBatch batch = new DataBatch();try {while (source.hasMore()) {// 提取数据batch.extract(source);// 预处理batch.preprocess();// 交换给下一阶段batch = exchanger.exchange(batch);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void stage2() {DataBatch batch = new DataBatch();try {while (true) {// 获取处理批次batch = exchanger.exchange(batch);// 分析数据batch.analyze();// 存储结果batch.storeResults();// 清空批次batch.clear();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

5.4 Exchanger核心特性总结

在这里插入图片描述
在这里插入图片描述

5.5 使用注意事项

  • 仅限两个线程:Exchanger 严格用于两个线程间的交换,多线程行为未定义
  • 死锁风险:如果一个线程未调用 exchange,另一个线程会永久阻塞
  • 对象复用:交换后对象会被对方线程修改,需要适当同步或使用不可变对象
  • 性能考虑:适合中低频交换场景,高频交换可能成为瓶颈
  • 超时设置:生产环境中应始终使用带超时的 exchange 方法
  • 资源清理:确保线程中断时正确处理资源释放

5.6 典型应用场景对比

在这里插入图片描述

Exchanger 是 Java 并发工具包中的一颗"隐藏宝石",特别适合需要精确控制两个线程间数据交换的场景。在正确的场景下使用 Exchanger 可以简化设计并提高性能,但需要注意其严格的线程配对要求。

六、没有了

学习快乐!!!

http://www.lryc.cn/news/623694.html

相关文章:

  • JavaScript 性能优化实战大纲
  • [GLM-4.5] LLM推理服务器(SGLang/vLLM) | 工具与推理解析器
  • c_str()函数的详细解析
  • 【PHP】Hyperf:接入 Nacos
  • Python | 解决 matplotlib 中文乱码
  • 基于MATLAB多智能体强化学习的出租车资源配置优化系统设计与实现
  • [论文阅读] 人工智能 + 职业教育 | 从技能操作者到技术反思者:生成式AI驱动职业教育学习范式转型
  • 豆包 Java的23种设计模式
  • 微调 AnomalyCLIP——基于对象无关提示学习与全局 - 局部优化的零样本异常检测框架性能验证
  • 迅速掌握Git通用指令
  • 7 索引的监控
  • 编程算法实例-整数分解质因数
  • Mac(五)自定义鼠标滚轮方向 LinearMouse
  • 又一家茑屋书店即将歇业,精品书店的未来在哪里?
  • Bee1.17.25更新Bug,完善功能.不支持NOSQL,分库分表Sharding(2.X版有)
  • Spark03-RDD02-常用的Action算子
  • YOLO12 改进、魔改|频域自注意力求解器FSAS,通过频域高效计算自注意力,在降低时间与空间复杂度的同时保留关键特征信息,提升遮挡、小目标检测
  • PostgreSQL——用户管理
  • 【IDEA】设置Debug调试时调试器不进入特定类(Spring框架、Mybatis框架)
  • Day3--滑动窗口与双指针--2461. 长度为 K 子数组中的最大和,1423. 可获得的最大点数,1052. 爱生气的书店老板
  • 【算法】模拟专题
  • JavaScript性能优化实战(三):DOM操作性能优化
  • openEuler等Linux系统中如何复制移动硬盘的数据
  • 【Luogu】每日一题——Day20. P4366 [Code+#4] 最短路 (图论)
  • 计算机网络 Session 劫持 原理和防御措施
  • 【Luogu】每日一题——Day21. P3556 [POI 2013] MOR-Tales of seafaring (图论)
  • 裸机框架:按键模组
  • 深度学习之优化器
  • 概率论基础教程第4章 随机变量(一)
  • 《Cocos游戏开发入门一本通》第四章