当前位置: 首页 > article >正文

DelayQueue、ScheduledThreadPoolExecutor 和 PriorityBlockingQueue :怎么利用堆实现定时任务

DelayQueue

DelayQueue 的最大亮点

  • 并不是简单全局锁的“单调队列”实现,而是用Leader-Follower 模式极大减少了线程唤醒的开销。
  • 插入与唤醒、等待与 leader 变更,都通过巧妙的锁和条件变量组合完成。

如果只关注“线程安全的优先队列+全局锁”,那就没什么意思;但 DelayQueue 对并发高效和唤醒机制的优化,是其核心看点。

DelayQueue 的核心结构和实现细节如下:

数据结构

  • 内部主要用一个 PriorityQueue<E> 实现,队列元素要求实现 Delayed 接口(这个接口说明可以排序,可以获得延迟时间。本质是按过期时间从小到大排序的优先队列)。
  • ReentrantLock 实现线程安全,所有与队列相关的操作都加锁。
  • Condition (available) 实现线程间等待与唤醒,以支持阻塞操作。
  • 维护一个 leader 线程变量,用于 Leader-Follower 模式优化等待。
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();

 Leader-Follower 等待优化(有趣实现

目的:减少线程唤醒的“惊群”现象,提高效率。

  • 只有一个线程(leader)会以定时方式等待队头元素的到期时间,其他线程无限期等待。
  • 队头元素变化(特别是有更早到期的元素插入时)会唤醒一个等待线程,使其成为新的 leader。
  • 当 leader 线程被唤醒或完成任务后,要重置 leader 并适当唤醒其他线程。
  • 一个有意思的点是,通过finally 唤醒线程 和 释放锁,会在try中的return之前执行,简化了代码。

关键代码片段(take 方法):

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return q.poll();first = null;if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

解释:

  • 当队头元素未过期时,只有 leader 线程定时等待,其他线程无限期 await。
  • 新元素插入(如果成为队头)会 leader = null; available.signal();,唤醒(或更换)leader。
  • leader 线程在等待超时或被唤醒后要检查自己是否还是 leader,并适时清理。

Leader 模式的核心设计思想

Leader 模式是一种经典的并发设计模式,在 DelayQueue.take() 中用来避免惊群效应减少CPU浪费

假设没有 Leader 机制,多个线程同时调用 take()

  • 队列头元素还有 5 秒过期
  • 所有线程都会 awaitNanos(5秒)
  • 5 秒后所有线程同时被唤醒
  • 只有一个线程能获取元素,其他线程白白消耗了CPU

 Leader 模式解决方案

if (leader != null)available.await();  // 非Leader线程无限期等待
else {Thread thisThread = Thread.currentThread();leader = thisThread;  // 成为Leadertry {available.awaitNanos(delay);  // 只有Leader定时等待} finally {if (leader == thisThread)leader = null;}
}

Leader 线程职责

  • 精确等待:只等待到队列头元素过期的时间点
  • 独占等待:同一时刻只有一个Leader线程在定时等待
  • 负责唤醒:获取元素后通过 finally 块唤醒其他线程

Follower 线程职责

  • 无限等待:调用 available.await() 直到被唤醒
  • 竞争Leader:被唤醒后重新循环,有机会成为新的Leader

关于条件变量唤醒顺序

条件变量无法控制先唤醒谁

Condition.signal() 的唤醒顺序是不确定的,JVM 规范没有保证FIFO(虽然 hotspot JVM 的实现大都FIFO)。但这不影响Leader模式的正确性:

finally {if (leader == null && q.peek() != null)available.signal();  // 唤醒一个等待的线程lock.unlock();
}

无论唤醒哪个线程,它都会:

  1. 重新检查队列状态
  2. 如果元素未过期且无Leader,就成为新Leader
  3. 如果已有Leader,就继续等待

剩余

offer/put/add 插入逻辑

  • 插入新元素时,如果它成为新的队头(即最早过期),则重置 leader 并 signal 唤醒等待线程。

代码片段:

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}

解释:

  • 新元素成为队头时唤醒等待线程,确保及时处理最新到期元素。

poll 和 poll(timeout) 的实现

  • poll() 只取已经过期(getDelay <= 0)的队头元素,否则返回 null。
  • poll(timeout) 支持有超时等待(内部逻辑和 take 类似)。

drainTo

  • 一次性取出所有已过期元素,提高批量处理效率。
for (E first;n < maxElements&& (first = q.peek()) != null&& first.getDelay(NANOSECONDS) <= 0;) {c.add(first);q.poll();++n;
}

ScheduledThreadPoolExecutor 结构分析

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor 并实现 ScheduledExecutorService,专门用于执行延迟任务和周期性任务。

public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService

关键字段设计

// 控制关闭后周期性任务的执行策略
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;// 任务取消时是否立即从队列移除
volatile boolean removeOnCancel;// 全局序列号生成器,保证FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();

ScheduledFutureTask - 智能任务包装器

这是最核心的内部类,实现了延迟和周期性执行的逻辑:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber;     // FIFO排序保证private volatile long time;            // 纳秒级触发时间private final long period;             // 周期值:正数=固定频率,负数=固定延迟,0=一次性RunnableScheduledFuture<V> outerTask = this;  // 支持装饰器模式int heapIndex;                         // 堆中的索引位置,O(1)删除的关键
}

核心算法 - 周期性任务的重新调度:

public void run() {if (!canRunInCurrentRunState(this))cancel(false);else if (!isPeriodic())super.run();  // 一次性任务直接执行else if (super.runAndReset()) {  // 周期性任务,执行但不设置结果setNextRunTime();            // 计算下次执行时间reExecutePeriodic(outerTask); // 重新加入队列}
}private void setNextRunTime() {long p = period;if (p > 0)time += p;        // 固定频率:基于上次开始时间elsetime = triggerTime(-p);  // 固定延迟:基于当前时间
}

DelayedWorkQueue - 高性能延迟队列

基于最小堆的无界延迟队列,这是一个节点维护数组索引的堆,支持直接删除非堆顶元素:

核心数据结构

private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private Thread leader;  // Leader-Follower模式的领导线程
private final Condition available = lock.newCondition();

O(log n) 插入和维护堆性质

private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;  // 父节点索引RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0)   // 已满足堆性质break;queue[k] = e;setIndex(e, k);              // 更新任务的堆索引k = parent;}queue[k] = key;setIndex(key, k);
}

O(1) 任务取消 - 利用heapIndex

public boolean remove(Object x) {final ReentrantLock lock = this.lock;lock.lock();try {int i = indexOf(x);if (i < 0) return false;setIndex(queue[i], -1);      // 标记为已删除int s = --size;RunnableScheduledFuture<?> replacement = queue[s];queue[s] = null;if (s != i) {siftDown(i, replacement);  // 向下调整if (queue[i] == replacement)siftUp(i, replacement); // 可能需要向上调整}return true;} finally {lock.unlock();}
}

Leader-Follower模式的take()

这是解决"惊群效应"的经典实现:

public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return finishPoll(first);first = null; // 避免内存泄漏if (leader != null)available.await();  // 非leader线程无限等待else {Thread thisThread = Thread.currentThread();leader = thisThread;  // 成为leadertry {available.awaitNanos(delay);  // 只等待必要时间} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();  // 唤醒新的leaderlock.unlock();}
}

延迟执行的核心逻辑

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);  // 加入延迟队列// 双重检查:加入后可能状态改变if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart();  // 确保有线程处理任务}
}

触发时间计算的边界处理

long triggerTime(long delay) {return System.nanoTime() + Math.min(delay, MAX_NANOS);
}private static final long MAX_NANOS = (Long.MAX_VALUE >>> 1) - 1;

设计总结

  1. heapIndex优化:每个任务记录在堆中的位置,实现O(1)删除
  2. Leader-Follower模式:避免多线程等待时的惊群效应
  3. 双重状态检查:任务加入队列后再次检查执行状态
  4. period符号语义:用正负号区分固定频率和固定延迟
  5. 序列号保证FIFO:相同时间的任务按提交顺序执行

这些实现细节使得 ScheduledThreadPoolExecutor 在处理大量延迟和周期性任务时具有优秀的性能表现。

PriorityBlockingQueue 结构分析

PriorityBlockingQueue是一个基于数组实现的二叉堆结构的无界阻塞优先队列,其核心特色在于动态扩容机制堆维护算法的线程安全实现。

主要成员变量

private transient Object[] queue;                    // 存储元素的数组(二叉堆)
private transient int size;                         // 当前元素数量
private transient Comparator<? super E> comparator; // 比较器
private final ReentrantLock lock;                   // 主锁
private final Condition notEmpty;                   // 等待条件
private transient volatile int allocationSpinLock;  // 扩容自旋锁

🔥 双锁机制的动态扩容

最有趣的设计是tryGrow()方法中的双锁策略

private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // 先释放主锁!Object[] newArray = null;if (allocationSpinLock == 0 &&ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {try {int growth = (oldCap < 64) ? (oldCap + 2) : (oldCap >> 1);int newCap = ArraysSupport.newLength(oldCap, 1, growth);if (queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) Thread.yield(); // 让出CPU给其他扩容线程lock.lock(); // 重新获取主锁if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}

算法亮点

  • 主锁释放:扩容时释放主锁,允许消费者继续take()操作
  • CAS自旋锁:用allocationSpinLock控制扩容竞争,避免多线程重复扩容
  • 容量策略:小数组快速增长(oldCap + 2),大数组按50%增长(oldCap >> 1)

🔥 智能的元素移除算法

removeAt(i)方法展现了 根据下标进行移除 双向堆调整的精妙:

private void removeAt(int i) {final Object[] es = queue;final int n = size - 1;if (n == i) // 移除最后一个元素es[i] = null;else {E moved = (E) es[n]; // 用最后一个元素填补es[n] = null;// 先向下调整siftDownComparable(i, moved, es, n);// 如果元素没动,说明需要向上调整if (es[i] == moved) {siftUpComparable(i, moved, es);}}size = n;
}

算法思路

  1. 最后一个元素替换被删除的元素
  2. 先尝试向下调整siftDown
  3. 如果元素位置未变,再向上调整siftUp
  4. 这样确保堆性质得到恢复

🔥 高效的堆调整算法

向下调整siftDownComparable)实现了标准的堆化:

private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // 只需要检查到叶子节点的父节点while (k < half) {int child = (k << 1) + 1; // 左子节点Object c = es[child];int right = child + 1;// 选择较小的子节点if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)c = es[child = right];if (key.compareTo((T) c) <= 0)break;es[k] = c;k = child;}es[k] = key;
}

关键优化

  • half = n >>> 1:只遍历到非叶子节点
  • 左右子节点比较:(k << 1) + 1 和 (k << 1) + 2
  • 向上冒泡较小元素,维持最小堆性质

架构特色

线程安全策略

  • 单一主锁:所有公共操作都通过ReentrantLock串行化
  • 扩容优化:扩容时释放主锁 + CAS自旋锁,提高并发性能
  • 条件等待:空队列时take()操作通过notEmpty.await()阻塞

性能设计

  • 无界队列remainingCapacity()始终返回Integer.MAX_VALUE
  • 批量操作drainTo()直接调用dequeue(),避免重复加锁
  • 快照迭代器iterator()基于数组副本,弱一致性遍历

这种设计使得PriorityBlockingQueue在保证线程安全的同时,通过巧妙的双锁机制和堆算法优化,实现了较高的并发性能。

http://www.lryc.cn/news/2404181.html

相关文章:

  • Kafka 消息模式实战:从简单队列到流处理(二)
  • 大数据(2) 大数据处理架构Hadoop
  • 【Kotlin】注解反射扩展
  • 固定ip和非固定ip的区别是什么?如何固定ip地址
  • 升级centos 7.9内核到 5.4.x
  • Nginx 安全设置配置
  • 协程的常用阻塞函数
  • 探索NoSQL注入的奥秘:如何消除MongoDB查询中的前置与后置条件
  • 使用矩阵乘法+线段树解决区间历史和问题的一种通用解法
  • React Navive初识
  • scss(sass)中 的使用说明
  • 如何从浏览器中导出网站证书
  • 低功耗MQTT物联网架构Java实现揭秘
  • 总结HTML中的文本标签
  • python版若依框架开发:前端开发规范
  • AI推理服务的高可用架构设计
  • GPU集群故障分析:大型AI训练中的硬件问题与影响
  • ideal2022.3.1版本编译项目报java: OutOfMemoryError: insufficient memory
  • centos7编译安装LNMP架构
  • 接口限频算法:漏桶算法、令牌桶算法、滑动窗口算法
  • Spring Boot 3.3 + MyBatis 基础教程:从入门到实践
  • 征文投稿:如何写一份实用的技术文档?——以软件配置为例
  • 【后端】RPC
  • 详细讲解Flutter GetX的使用
  • ReLU 新生:从死亡困境到强势回归
  • tensorflow image_dataset_from_directory 训练数据集构建
  • QuickJS 如何发送一封邮件 ?
  • clickhouse 和 influxdb 选型
  • GOOUUU ESP32-S3-CAM 果云科技开发板开发指南(一)(超详细!)Vscode+espidf 通过摄像头拍摄照片并存取到SD卡中,文末附源码
  • C++学习思路