并发编程常用工具类(上):CountDownLatch 与 Semaphore 的协作应用
在 Java 并发编程领域,JDK 提供的工具类是简化多线程协作的重要武器。这些工具类基于 AQS(AbstractQueuedSynchronizer)框架实现,封装了复杂的同步逻辑,让开发者无需深入底层即可实现高效的线程协作。本文作为并发工具类系列的第一篇,将重点解析CountDownLatch和Semaphore的核心原理、典型使用场景及实战案例,帮助开发者掌握其在多线程协作中的应用技巧。
一、CountDownLatch:等待多线程完成的计数器
CountDownLatch(倒计时门闩)是一种经典的线程同步工具,其核心功能是让一个或多个线程等待其他线程完成指定操作后再继续执行。它通过一个递减的计数器实现线程间的协调,计数器归零时,所有等待的线程将被唤醒。
1.1 核心原理与方法解析
CountDownLatch的设计基于 “计数器 + 等待唤醒” 机制,核心方法如下:
方法 | 功能描述 |
CountDownLatch(int count) | 构造方法,初始化计数器值(count为需要等待的线程操作数,必须≥0) |
void await() | 调用线程进入阻塞状态,直至计数器归 0 或被中断 |
boolean await(long timeout, TimeUnit unit) | 带超时时间的等待,超时后无论计数器是否归 0,线程都会唤醒并返回false |
void countDown() | 将计数器值减 1,当值为 0 时,唤醒所有因await()阻塞的线程 |
关键特性:CountDownLatch的计数器是一次性的,一旦归 0,后续调用countDown()不会再改变其状态,因此无法重复使用。
1.2 典型场景:主线程等待子线程初始化完成
在大型应用启动过程中,主线程往往需要等待多个初始化任务(如加载配置文件、初始化数据库连接、预热缓存等)完成后才能启动核心业务。CountDownLatch完美适配这种 “等待多任务完成” 的场景。
实战案例:
public class SystemInitDemo {// 初始化计数器,需等待3个核心任务完成private static final CountDownLatch initLatch = new CountDownLatch(3);public static void main(String[] args) throws InterruptedException {System.out.println("系统启动:开始等待初始化任务...");// 启动配置加载任务new Thread(() -> {try {System.out.println("配置加载任务:开始加载系统配置...");Thread.sleep(1500); // 模拟配置加载耗时System.out.println("配置加载任务:完成加载");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown(); // 任务完成,计数器减1}}, "配置线程").start();// 启动数据库连接任务new Thread(() -> {try {System.out.println("数据库任务:开始建立连接池...");Thread.sleep(2000); // 模拟连接池初始化耗时System.out.println("数据库任务:连接池初始化完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown();}}, "数据库线程").start();// 启动缓存预热任务new Thread(() -> {try {System.out.println("缓存任务:开始预热热点数据...");Thread.sleep(1000); // 模拟缓存预热耗时System.out.println("缓存任务:热点数据预热完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown();}}, "缓存线程").start();// 主线程等待所有初始化任务完成initLatch.await();System.out.println("系统启动:所有初始化任务完成,启动核心服务...");}
}
运行结果:
系统启动:开始等待初始化任务...配置加载任务:开始加载系统配置...数据库任务:开始建立连接池...缓存任务:开始预热热点数据...缓存任务:热点数据预热完成配置加载任务:完成加载数据库任务:连接池初始化完成系统启动:所有初始化任务完成,启动核心服务...
案例解析:
- 主线程通过initLatch.await()阻塞等待,直到 3 个初始化线程都调用countDown()使计数器归 0;
- 即使各任务执行时间不同(数据库任务耗时最长),主线程也会等待所有任务完成后再继续,确保系统启动的完整性;
- finally块中调用countDown()保证即使任务异常,计数器也能正确递减,避免主线程无限等待。
1.3 反向应用:子线程等待主线程指令
CountDownLatch不仅能让主线程等待子线程,还能通过反向设计实现 “子线程等待主线程信号”。例如在并发测试中,让所有测试线程准备就绪后,等待主线程发出 “开始” 指令,确保所有线程同时执行测试代码,消除启动顺序带来的误差。
示例代码:
public class ConcurrentTestDemo {// 计数器初始化为1,代表主线程的"开始"信号private static final CountDownLatch startSignal = new CountDownLatch(1);// 记录并发执行结果private static final AtomicInteger result = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {int threadCount = 5; // 并发线程数// 启动5个测试线程for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ":准备就绪,等待开始信号");startSignal.await(); // 等待主线程指令// 收到信号后执行并发操作result.incrementAndGet();System.out.println(Thread.currentThread().getName() + ":执行完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "测试线程-" + i).start();}// 主线程准备3秒后发出开始信号Thread.sleep(3000);System.out.println("主线程:发出开始信号");startSignal.countDown(); // 计数器归0,唤醒所有测试线程// 等待所有测试线程完成(实际场景可再用一个CountDownLatch)Thread.sleep(1000);System.out.println("所有线程执行完成,最终结果:" + result.get()); // 预期结果为5}
}
核心价值:通过startSignal确保所有子线程在同一时间点开始执行,真实模拟高并发场景,提升测试准确性。
二、Semaphore:控制资源并发访问的信号量
Semaphore(信号量)是用于控制资源并发访问数量的工具类,它通过维护一组 “许可”(permit)实现对资源的限流。线程需要先获取许可才能访问资源,访问结束后释放许可,供其他线程使用。
2.1 核心原理与方法解析
Semaphore的核心是 “许可管理”,通过控制许可数量限制并发线程数,核心方法如下:
方法 | 功能描述 |
Semaphore(int permits) | 构造方法,初始化许可数量(permits为允许同时访问的线程数) |
Semaphore(int permits, boolean fair) | 带公平性参数的构造方法,fair=true时按线程请求顺序分配许可 |
void acquire() | 获取 1 个许可,若暂时无可用许可,线程会阻塞等待 |
boolean tryAcquire() | 尝试获取 1 个许可,立即返回结果(成功true/ 失败false),不阻塞 |
boolean tryAcquire(long timeout, TimeUnit unit) | 超时尝试获取许可,超时未获取则返回false |
void release() | 释放 1 个许可,将其归还给信号量 |
int availablePermits() | 返回当前可用的许可数量 |
关键特性:Semaphore的许可数量可以动态调整,release()方法可在未获取许可的情况下释放,从而增加总许可数(需谨慎使用)。
2.2 典型场景:资源池的并发访问控制
数据库连接池、线程池等资源池场景中,资源数量有限,Semaphore可用于限制同时访问资源的线程数,防止因资源耗尽导致的系统异常。
实战案例:
public class ConnectionPoolDemo {// 数据库连接池(模拟10个连接)private static final int POOL_SIZE = 10;private static final List<Connection> connectionPool = new ArrayList<>(POOL_SIZE);// 信号量控制并发访问,许可数等于连接池大小private static final Semaphore semaphore = new Semaphore(POOL_SIZE, true); // 公平模式// 初始化连接池static {for (int i = 0; i < POOL_SIZE; i++) {connectionPool.add(new MockConnection("连接-" + (i + 1)));}}// 获取数据库连接public static Connection getConnection() throws InterruptedException {semaphore.acquire(); // 获取许可(若连接池满则等待)synchronized (connectionPool) {return connectionPool.remove(0); // 从池内取出连接}}// 释放数据库连接public static void releaseConnection(Connection connection) {if (connection != null) {synchronized (connectionPool) {connectionPool.add(connection); // 连接放回池内}semaphore.release(); // 释放许可}}// 模拟数据库连接类static class MockConnection {private String name;MockConnection(String name) { this.name = name; }@Overridepublic String toString() { return name; }}public static void main(String[] args) {// 模拟20个线程并发请求连接for (int i = 0; i < 20; i++) {new Thread(() -> {Connection conn = null;try {conn = getConnection();System.out.println(Thread.currentThread().getName() + "获取到" + conn + ",当前可用许可:" + semaphore.availablePermits());Thread.sleep(1000); // 模拟数据库操作耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {releaseConnection(conn);if (conn != null) {System.out.println(Thread.currentThread().getName() + "释放了" + conn + ",当前可用许可:" + semaphore.availablePermits());}}}, "业务线程-" + i).start();}}
}
运行结果片段:
业务线程-0获取到连接-1,当前可用许可:9业务线程-1获取到连接-2,当前可用许可:8...业务线程-9获取到连接-10,当前可用许可:0// 此时许可耗尽,线程10-19进入等待业务线程-0释放了连接-1,当前可用许可:1业务线程-10获取到连接-1,当前可用许可:0
...
案例解析:
- Semaphore通过 10 个许可限制同时使用连接的线程数,与连接池容量匹配,避免资源过度占用;
- 公平模式(fair=true)确保线程按请求顺序获取许可,减少饥饿现象;
- getConnection()和releaseConnection()通过同步块保证连接池操作的线程安全,结合信号量实现完整的资源管控。
2.3 扩展场景:接口限流与流量控制
Semaphore可用于接口限流,通过控制单位时间内的请求数保护系统稳定。例如限制某 API 每秒最多处理 100 个请求,超出部分直接拒绝或排队等待。
示例代码:
public class ApiRateLimiter {private final Semaphore semaphore;private final int maxRequestsPerSecond; // 每秒最大请求数public ApiRateLimiter(int maxRequestsPerSecond) {this.maxRequestsPerSecond = maxRequestsPerSecond;this.semaphore = new Semaphore(maxRequestsPerSecond);// 定时任务:每秒重置许可数量ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {int permitsToRelease = maxRequestsPerSecond - semaphore.availablePermits();if (permitsToRelease > 0) {semaphore.release(permitsToRelease); // 补充许可至上限}}, 1, 1, TimeUnit.SECONDS);}// 尝试访问APIpublic boolean tryAccess() {return semaphore.tryAcquire();}public static void main(String[] args) {// 限制每秒最多5个请求ApiRateLimiter limiter = new ApiRateLimiter(5);// 模拟10个并发请求for (int i = 0; i < 10; i++) {new Thread(() -> {if (limiter.tryAccess()) {System.out.println(Thread.currentThread().getName() + ":API访问成功");} else {System.out.println(Thread.currentThread().getName() + ":API访问被限流");}}, "请求线程-" + i).start();}}
}
运行结果:
请求线程-0:API访问成功
请求线程-1:API访问成功
请求线程-2:API访问成功
请求线程-3:API访问成功
请求线程-4:API访问成功
请求线程-5:API访问被限流
请求线程-6:API访问被限流
...
限流原理:通过定时任务每秒补充许可,使Semaphore的许可数始终维持在maxRequestsPerSecond,从而实现固定速率的流量控制。
三、CountDownLatch 与 Semaphore 的对比与协同
特性 | CountDownLatch | Semaphore |
核心功能 | 等待多个线程完成操作 | 控制并发访问资源的线程数 |
计数器特性 | 一次性递减,归 0 后不可重置 | 可重复获取和释放,动态调整 |
典型场景 | 初始化协调、并发测试同步 | 资源池控制、接口限流 |
线程协作方向 | 多线程→主线程(或反之) | 线程间竞争资源 |
协同案例:在分布式任务调度中,可用CountDownLatch等待所有任务节点准备就绪,再用Semaphore控制同时执行任务的节点数,实现 “先同步准备,再限流执行” 的流程。
总结
CountDownLatch和Semaphore是解决多线程协作问题的利器:CountDownLatch通过计数器实现线程间的等待协调,适合初始化、测试同步等场景;Semaphore通过许可管理控制资源并发访问,适合资源池、限流等场景。掌握这两个工具类的核心原理和使用技巧,能显著提升并发编程的效率和可靠性。
下一篇将介绍CyclicBarrier、Phaser等其他常用工具类,敬请期待。