当前位置: 首页 > news >正文

并发编程常用工具类(下):CyclicBarrier 与 Phaser 的协同应用

在并发编程中,除了CountDownLatch和Semaphore,CyclicBarrier和Phaser也是实现多线程协作的重要工具。它们在处理多阶段任务同步、动态调整参与线程等场景中展现出独特价值。本文作为并发工具类系列的第二篇,将深入解析CyclicBarrier和Phaser的核心机制、实战案例及适用场景,帮助开发者构建更灵活的线程协作模型。

一、CyclicBarrier:循环屏障的多阶段协作

CyclicBarrier(循环屏障)的设计初衷是让一组线程在到达某个屏障点时暂停,直至所有线程都到达后再共同继续执行。与CountDownLatch的一次性使用不同,CyclicBarrier的计数器可通过reset()方法重置,支持多轮次的线程同步,这也是其 “循环” 特性的由来。

1.1 核心原理与方法解析

CyclicBarrier基于 “屏障点 + 集体唤醒” 机制实现,核心方法如下:

方法

功能描述

CyclicBarrier(int parties)

构造方法,指定参与同步的线程数量(parties)

CyclicBarrier(int parties, Runnable barrierAction)

带屏障动作的构造方法,所有线程到达后先执行该动作

int await()

线程到达屏障点后阻塞等待,返回当前线程的到达顺序(0~parties-1)

int await(long timeout, TimeUnit unit)

带超时的等待,超时后屏障被打破,抛出TimeoutException

void reset()

重置屏障至初始状态,所有等待线程将收到BrokenBarrierException

int getNumberWaiting()

返回当前正在屏障点等待的线程数

boolean isBroken()

判断屏障是否被打破(如线程中断、超时等)

关键特性:CyclicBarrier的核心是 “所有线程必须同时到达屏障点”,适用于多阶段任务中各阶段的同步,且支持重复使用。

1.2 典型场景:分阶段数据处理

在数据处理流程中,常需将任务分为多个阶段(如数据采集→清洗→分析→存储),每个阶段需所有线程完成当前工作后才能进入下一阶段。CyclicBarrier能完美管控这种分阶段协作。

实战案例

public class DataProcessDemo {// 3个线程参与处理,所有线程到达后执行阶段总结动作private static final CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("\n=== 所有线程完成当前阶段,进入下一阶段 ==="));public static void main(String[] args) {// 启动3个数据处理线程for (int i = 0; i < 3; i++) {new Thread(new DataProcessor(i), "处理线程-" + i).start();}}static class DataProcessor implements Runnable {private int threadId;public DataProcessor(int threadId) {this.threadId = threadId;}@Overridepublic void run() {try {// 第一阶段:数据采集System.out.println("线程" + threadId + ":开始数据采集");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据采集完成,等待其他线程");barrier.await();// 第二阶段:数据清洗System.out.println("线程" + threadId + ":开始数据清洗");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据清洗完成,等待其他线程");barrier.await();// 第三阶段:数据存储System.out.println("线程" + threadId + ":开始数据存储");Thread.sleep((long) (Math.random() * 1000));System.out.println("线程" + threadId + ":数据存储完成,等待其他线程");barrier.await();System.out.println("线程" + threadId + ":所有阶段处理完成");} catch (InterruptedException | BrokenBarrierException e) {System.out.println("线程" + threadId + ":屏障被打破,异常信息:" + e.getMessage());}}}
}

运行结果片段


线程0:开始数据采集线程1:开始数据采集线程2:开始数据采集线程1:数据采集完成,等待其他线程线程0:数据采集完成,等待其他线程线程2:数据采集完成,等待其他线程=== 所有线程完成当前阶段,进入下一阶段 ===线程0:开始数据清洗线程1:开始数据清洗线程2:开始数据清洗...

案例解析

  • 每个线程完成阶段任务后调用barrier.await(),阻塞等待其他线程;
  • 当 3 个线程均到达屏障点时,先执行屏障动作(打印阶段总结),再唤醒所有线程进入下一阶段;
  • 若任何线程在等待过程中被中断或超时,屏障会被标记为 “打破”,所有线程将收到BrokenBarrierException,避免部分线程无限等待。

1.3 与 CountDownLatch 的核心差异

尽管两者都能实现线程同步,但适用场景截然不同:

维度

CyclicBarrier

CountDownLatch

复用性

可通过reset()重置,支持多轮同步

计数器归 0 后不可复用,一次性使用

同步逻辑

所有线程相互等待(线程→线程)

一组线程等待另一组线程(线程组→线程组)

核心动作

线程到达屏障点后阻塞,需集体唤醒

线程完成任务后递减计数器,无需等待

典型场景

分阶段任务的各阶段同步

初始化等待、事件通知

示例对比:用CountDownLatch实现上述分阶段任务需为每个阶段创建新的计数器,而CyclicBarrier可通过同一实例完成所有阶段同步,代码更简洁。

二、Phaser:动态调整的阶段同步器

Phaser是 Java 7 引入的高级同步工具,兼具CountDownLatch和CyclicBarrier的功能,且支持动态调整参与线程数量(注册 / 注销),适用于线程数量动态变化的多阶段任务。

2.1 核心原理与方法解析

Phaser通过 “阶段(phase)” 和 “参与者(party)” 概念实现同步,核心方法如下:

方法

功能描述

Phaser(int parties)

构造方法,指定初始参与者数量

int register()

注册一个参与者,返回当前阶段号

boolean deregister()

注销一个参与者,返回是否为最后一个参与者

int arriveAndAwaitAdvance()

当前参与者到达阶段终点,等待其他参与者后进入下一阶段

int arriveAndDeregister()

到达阶段终点并注销,适用于完成所有任务的参与者

int getPhase()

返回当前阶段号(从 0 开始,溢出后重置为 0)

int getRegisteredParties()

返回当前注册的参与者数量

关键特性:Phaser的阶段号随所有参与者到达而递增,支持动态增减参与者,且可通过重写onAdvance(int phase, int registeredParties)方法自定义阶段切换逻辑。

2.2 典型场景:动态线程的多阶段任务

在分布式计算或并行处理中,线程可能因任务完成而退出,或因新任务加入而新增,Phaser能灵活应对这种动态变化。

实战案例

public class DynamicTaskDemo {public static void main(String[] args) throws InterruptedException {// 初始3个参与者,重写阶段切换逻辑Phaser phaser = new Phaser(3) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("\n=== 阶段" + phase + "完成,当前参与者:" + registeredParties + " ===");// 当参与者为0或完成3个阶段时终止return registeredParties == 0 || phase >= 2;}};// 启动3个初始任务线程for (int i = 0; i < 3; i++) {new Thread(new DynamicWorker(phaser, i), "初始线程-" + i).start();}// 主线程等待所有阶段完成while (!phaser.isTerminated()) {Thread.sleep(100);}System.out.println("\n所有阶段完成,Phaser终止");}static class DynamicWorker implements Runnable {private Phaser phaser;private int workerId;public DynamicWorker(Phaser phaser, int workerId) {this.phaser = phaser;this.workerId = workerId;}@Overridepublic void run() {try {// 阶段0:数据准备System.out.println("线程" + workerId + ":阶段0准备中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待进入阶段1// 线程0在阶段1后注册新参与者if (workerId == 0 && phaser.getPhase() == 1) {phaser.register();new Thread(new DynamicWorker(phaser, 3), "新增线程-3").start();System.out.println("线程0:注册新参与者,当前参与者数:" + phaser.getRegisteredParties());}// 阶段1:数据处理System.out.println("线程" + workerId + ":阶段1处理中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待进入阶段2// 线程1在阶段2后注销if (workerId == 1) {phaser.deregister();System.out.println("线程1:已注销,当前参与者数:" + phaser.getRegisteredParties());return; // 线程1完成任务退出}// 阶段2:结果汇总System.out.println("线程" + workerId + ":阶段2汇总中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndDeregister(); // 完成后注销} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

运行结果片段

线程0:阶段0准备中...线程1:阶段0准备中...线程2:阶段0准备中...=== 阶段0完成,当前参与者:3 ===线程0:阶段1处理中...线程1:阶段1处理中...线程2:阶段1处理中...线程0:注册新参与者,当前参与者数:4新增线程-3:阶段1处理中...=== 阶段1完成,当前参与者:4 ===线程0:阶段2汇总中...线程2:阶段2汇总中...线程1:已注销,当前参与者数:3新增线程-3:阶段2汇总中...=== 阶段2完成,当前参与者:3 ===所有阶段完成,Phaser终止

案例解析

  • Phaser初始注册 3 个参与者,阶段 0 完成后进入阶段 1;
  • 线程 0 在阶段 1 动态注册新参与者(线程 3),参与者数变为 4;
  • 线程 1 在阶段 1 完成后注销,参与者数减为 3;
  • 所有参与者完成阶段 2 后,onAdvance()返回true,Phaser终止;
  • 动态注册 / 注销功能使Phaser能适应线程数量变化,比CyclicBarrier更灵活。

2.3 高级特性:分层 Phaser

对于复杂任务,可通过Phaser的分层机制(父 Phaser 管理子 Phaser)减少单个 Phaser 的竞争压力。例如,将 1000 个线程分为 10 组,每组由一个子 Phaser 管理,子 Phaser 再注册到父 Phaser,实现 “局部同步→全局同步” 的层级协作。

代码示例

public class HierarchicalPhaserDemo {public static void main(String[] args) {// 父Phaser,初始0个参与者Phaser root = new Phaser(0) {@Overrideprotected boolean onAdvance(int phase, int parties) {System.out.println("全局阶段" + phase + "完成,参与组:" + parties);return phase >= 1; // 完成2个全局阶段后终止}};// 创建3个子Phaser,父Phaser为rootPhaser[] children = new Phaser[3];for (int i = 0; i < 3; i++) {children[i] = new Phaser(root, 2); // 每个子Phaser管理2个线程}// 启动6个线程(3组×2)for (int i = 0; i < 3; i++) {int groupId = i;for (int j = 0; j < 2; j++) {new Thread(() -> {for (int phase = 0; phase < 2; phase++) {System.out.println("组" + groupId + "线程" + Thread.currentThread().getId() + ":完成局部阶段" + phase);children[groupId].arriveAndAwaitAdvance(); // 等待组内同步}children[groupId].arriveAndDeregister(); // 完成后注销}).start();}}}
}

核心价值:分层机制降低了单个 Phaser 的竞争频率,提升高并发场景下的性能。

三、四大工具类的综合对比与选型

工具类

核心能力

灵活性

典型场景

适用线程数

CountDownLatch

等待多线程完成

低(一次性)

初始化、事件通知

固定

Semaphore

控制资源并发数

中(动态许可)

资源池、限流

不固定

CyclicBarrier

多阶段线程同步

中(可重置)

分阶段任务

固定

Phaser

动态阶段同步

高(动态注册)

动态线程任务、分层同步

动态变化

选型建议

  • 简单等待场景用CountDownLatch;
  • 资源限流场景用Semaphore;
  • 固定线程的多阶段任务用CyclicBarrier;
  • 动态线程或复杂分层任务用Phaser。

总结

CyclicBarrier和Phaser为多线程协作提供了更灵活的解决方案:CyclicBarrier通过循环屏障实现固定线程的多阶段同步,适合分步骤协同工作;Phaser则支持动态调整参与者,能应对线程数量变化的复杂场景。

结合上一篇的CountDownLatch和Semaphore,这四类工具类基本覆盖了常见的线程协作需求。在实际开发中,需根据线程数量是否固定、是否多阶段任务、是否需要动态调整等因素选择合适的工具,以实现高效、可靠的并发控制。

掌握这些工具类的核心原理和适用场景,不仅能简化并发代码的编写,更能提升系统在高并发场景下的稳定性和性能。

http://www.lryc.cn/news/610445.html

相关文章:

  • (论文速读)RMT:Retentive+ViT的视觉新骨干
  • Hadoop HDFS 3.3.4 讲解~
  • 嵌入式知识篇---闪存
  • mysql 数据库系统坏了,物理拷贝出数据怎么读取
  • Deepoc 赋能送餐机器人:从机械执行到具身智能的革命性跨越
  • JavaScript 中的流程控制语句详解
  • 机器学习实战:逻辑回归深度解析与欺诈检测评估指标详解(二)
  • Redis缓存详解及常见问题解决方案
  • MySQL 基本操作入门指南
  • MCP进阶:工业协议与AI智能体的融合革命
  • 使用 SecureCRT 连接华为 eNSP 模拟器的方法
  • typeof和instanceof区别
  • Linux学习记录(八)文件共享
  • 认识pytorch与pytorch lightning
  • BackgroundTasks 如何巧妙驾驭多任务并发?
  • 我的创作纪念日____在 CSDN一年来的成长历程和收获
  • openvela之内存管理
  • Linux 磁盘管理与分区配置
  • VUE+SPRINGBOOT从0-1打造前后端-前后台系统-注册实现
  • 向量魔法:Embedding如何赋能大模型理解世界
  • Go语言select
  • Git基础玩法简单描述
  • 【LeetCode刷题集】--排序(一)
  • ICCV2025 Tracking相关paper汇总和解读(19篇)
  • ubuntu 20.04 C和C++的标准头文件都放在哪个目录?
  • windows双系统下ubuntu20.04安装教程
  • HTTPS有哪些优点
  • Jeston + TensorRT + Realsense D435i + ROS noetic + Yolo11 各版本模型目标检测
  • Flink CDC 介绍
  • Field and wave electromagnetics 复习