DelayQueue、ScheduledThreadPoolExecutor 和 PriorityBlockingQueue :怎么利用堆实现定时任务
DelayQueue
DelayQueue 的最大亮点:
- 并不是简单全局锁的“单调队列”实现,而是用Leader-Follower 模式极大减少了线程唤醒的开销。
- 插入与唤醒、等待与 leader 变更,都通过巧妙的锁和条件变量组合完成。
如果只关注“线程安全的优先队列+全局锁”,那就没什么意思;但 DelayQueue 对并发高效和唤醒机制的优化,是其核心看点。
DelayQueue 的核心结构和实现细节如下:
数据结构
- 内部主要用一个
PriorityQueue<E>
实现,队列元素要求实现Delayed
接口(这个接口说明可以排序,可以获得延迟时间。本质是按过期时间从小到大排序的优先队列)。 - 用
ReentrantLock
实现线程安全,所有与队列相关的操作都加锁。 - 用
Condition
(available
) 实现线程间等待与唤醒,以支持阻塞操作。 - 维护一个
leader
线程变量,用于 Leader-Follower 模式优化等待。
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
Leader-Follower 等待优化(有趣实现)
目的:减少线程唤醒的“惊群”现象,提高效率。
- 只有一个线程(leader)会以定时方式等待队头元素的到期时间,其他线程无限期等待。
- 队头元素变化(特别是有更早到期的元素插入时)会唤醒一个等待线程,使其成为新的 leader。
- 当 leader 线程被唤醒或完成任务后,要重置 leader 并适当唤醒其他线程。
- 一个有意思的点是,通过finally 唤醒线程 和 释放锁,会在try中的return之前执行,简化了代码。
关键代码片段(take 方法):
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return q.poll();first = null;if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}
解释:
- 当队头元素未过期时,只有 leader 线程定时等待,其他线程无限期 await。
- 新元素插入(如果成为队头)会
leader = null; available.signal();
,唤醒(或更换)leader。 - leader 线程在等待超时或被唤醒后要检查自己是否还是 leader,并适时清理。
Leader 模式的核心设计思想
Leader 模式是一种经典的并发设计模式,在 DelayQueue.take()
中用来避免惊群效应和减少CPU浪费。
假设没有 Leader 机制,多个线程同时调用 take()
:
- 队列头元素还有 5 秒过期
- 所有线程都会
awaitNanos(5秒)
- 5 秒后所有线程同时被唤醒
- 只有一个线程能获取元素,其他线程白白消耗了CPU
Leader 模式解决方案
if (leader != null)available.await(); // 非Leader线程无限期等待
else {Thread thisThread = Thread.currentThread();leader = thisThread; // 成为Leadertry {available.awaitNanos(delay); // 只有Leader定时等待} finally {if (leader == thisThread)leader = null;}
}
Leader 线程职责
- 精确等待:只等待到队列头元素过期的时间点
- 独占等待:同一时刻只有一个Leader线程在定时等待
- 负责唤醒:获取元素后通过
finally
块唤醒其他线程
Follower 线程职责
- 无限等待:调用
available.await()
直到被唤醒 - 竞争Leader:被唤醒后重新循环,有机会成为新的Leader
关于条件变量唤醒顺序
条件变量无法控制先唤醒谁
Condition.signal()
的唤醒顺序是不确定的,JVM 规范没有保证FIFO(虽然 hotspot JVM 的实现大都FIFO)。但这不影响Leader模式的正确性:
finally {if (leader == null && q.peek() != null)available.signal(); // 唤醒一个等待的线程lock.unlock();
}
无论唤醒哪个线程,它都会:
- 重新检查队列状态
- 如果元素未过期且无Leader,就成为新Leader
- 如果已有Leader,就继续等待
剩余
offer/put/add 插入逻辑
- 插入新元素时,如果它成为新的队头(即最早过期),则重置 leader 并 signal 唤醒等待线程。
代码片段:
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}
解释:
- 新元素成为队头时唤醒等待线程,确保及时处理最新到期元素。
poll 和 poll(timeout) 的实现
poll()
只取已经过期(getDelay <= 0)的队头元素,否则返回 null。poll(timeout)
支持有超时等待(内部逻辑和 take 类似)。
drainTo
- 一次性取出所有已过期元素,提高批量处理效率。
for (E first;n < maxElements&& (first = q.peek()) != null&& first.getDelay(NANOSECONDS) <= 0;) {c.add(first);q.poll();++n;
}
ScheduledThreadPoolExecutor 结构分析
ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
并实现 ScheduledExecutorService
,专门用于执行延迟任务和周期性任务。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
关键字段设计
// 控制关闭后周期性任务的执行策略
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;// 任务取消时是否立即从队列移除
volatile boolean removeOnCancel;// 全局序列号生成器,保证FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();
ScheduledFutureTask - 智能任务包装器
这是最核心的内部类,实现了延迟和周期性执行的逻辑:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber; // FIFO排序保证private volatile long time; // 纳秒级触发时间private final long period; // 周期值:正数=固定频率,负数=固定延迟,0=一次性RunnableScheduledFuture<V> outerTask = this; // 支持装饰器模式int heapIndex; // 堆中的索引位置,O(1)删除的关键
}
核心算法 - 周期性任务的重新调度:
public void run() {if (!canRunInCurrentRunState(this))cancel(false);else if (!isPeriodic())super.run(); // 一次性任务直接执行else if (super.runAndReset()) { // 周期性任务,执行但不设置结果setNextRunTime(); // 计算下次执行时间reExecutePeriodic(outerTask); // 重新加入队列}
}private void setNextRunTime() {long p = period;if (p > 0)time += p; // 固定频率:基于上次开始时间elsetime = triggerTime(-p); // 固定延迟:基于当前时间
}
DelayedWorkQueue - 高性能延迟队列
基于最小堆的无界延迟队列,这是一个节点维护数组索引的堆,支持直接删除非堆顶元素:
核心数据结构
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private Thread leader; // Leader-Follower模式的领导线程
private final Condition available = lock.newCondition();
O(log n) 插入和维护堆性质
private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1; // 父节点索引RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0) // 已满足堆性质break;queue[k] = e;setIndex(e, k); // 更新任务的堆索引k = parent;}queue[k] = key;setIndex(key, k);
}
O(1) 任务取消 - 利用heapIndex
public boolean remove(Object x) {final ReentrantLock lock = this.lock;lock.lock();try {int i = indexOf(x);if (i < 0) return false;setIndex(queue[i], -1); // 标记为已删除int s = --size;RunnableScheduledFuture<?> replacement = queue[s];queue[s] = null;if (s != i) {siftDown(i, replacement); // 向下调整if (queue[i] == replacement)siftUp(i, replacement); // 可能需要向上调整}return true;} finally {lock.unlock();}
}
Leader-Follower模式的take()
这是解决"惊群效应"的经典实现:
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return finishPoll(first);first = null; // 避免内存泄漏if (leader != null)available.await(); // 非leader线程无限等待else {Thread thisThread = Thread.currentThread();leader = thisThread; // 成为leadertry {available.awaitNanos(delay); // 只等待必要时间} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal(); // 唤醒新的leaderlock.unlock();}
}
延迟执行的核心逻辑
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task); // 加入延迟队列// 双重检查:加入后可能状态改变if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart(); // 确保有线程处理任务}
}
触发时间计算的边界处理
long triggerTime(long delay) {return System.nanoTime() + Math.min(delay, MAX_NANOS);
}private static final long MAX_NANOS = (Long.MAX_VALUE >>> 1) - 1;
设计总结
- heapIndex优化:每个任务记录在堆中的位置,实现O(1)删除
- Leader-Follower模式:避免多线程等待时的惊群效应
- 双重状态检查:任务加入队列后再次检查执行状态
- period符号语义:用正负号区分固定频率和固定延迟
- 序列号保证FIFO:相同时间的任务按提交顺序执行
这些实现细节使得 ScheduledThreadPoolExecutor
在处理大量延迟和周期性任务时具有优秀的性能表现。
PriorityBlockingQueue 结构分析
PriorityBlockingQueue
是一个基于数组实现的二叉堆结构的无界阻塞优先队列,其核心特色在于动态扩容机制和堆维护算法的线程安全实现。
主要成员变量
private transient Object[] queue; // 存储元素的数组(二叉堆)
private transient int size; // 当前元素数量
private transient Comparator<? super E> comparator; // 比较器
private final ReentrantLock lock; // 主锁
private final Condition notEmpty; // 等待条件
private transient volatile int allocationSpinLock; // 扩容自旋锁
🔥 双锁机制的动态扩容
最有趣的设计是tryGrow()
方法中的双锁策略:
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // 先释放主锁!Object[] newArray = null;if (allocationSpinLock == 0 &&ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {try {int growth = (oldCap < 64) ? (oldCap + 2) : (oldCap >> 1);int newCap = ArraysSupport.newLength(oldCap, 1, growth);if (queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) Thread.yield(); // 让出CPU给其他扩容线程lock.lock(); // 重新获取主锁if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}
算法亮点:
- 主锁释放:扩容时释放主锁,允许消费者继续
take()
操作 - CAS自旋锁:用
allocationSpinLock
控制扩容竞争,避免多线程重复扩容 - 容量策略:小数组快速增长(
oldCap + 2
),大数组按50%增长(oldCap >> 1
)
🔥 智能的元素移除算法
removeAt(i)
方法展现了 根据下标进行移除 双向堆调整的精妙:
private void removeAt(int i) {final Object[] es = queue;final int n = size - 1;if (n == i) // 移除最后一个元素es[i] = null;else {E moved = (E) es[n]; // 用最后一个元素填补es[n] = null;// 先向下调整siftDownComparable(i, moved, es, n);// 如果元素没动,说明需要向上调整if (es[i] == moved) {siftUpComparable(i, moved, es);}}size = n;
}
算法思路:
- 用最后一个元素替换被删除的元素
- 先尝试向下调整(
siftDown
) - 如果元素位置未变,再向上调整(
siftUp
) - 这样确保堆性质得到恢复
🔥 高效的堆调整算法
向下调整(siftDownComparable
)实现了标准的堆化:
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // 只需要检查到叶子节点的父节点while (k < half) {int child = (k << 1) + 1; // 左子节点Object c = es[child];int right = child + 1;// 选择较小的子节点if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)c = es[child = right];if (key.compareTo((T) c) <= 0)break;es[k] = c;k = child;}es[k] = key;
}
关键优化:
half = n >>> 1
:只遍历到非叶子节点- 左右子节点比较:
(k << 1) + 1
和(k << 1) + 2
- 向上冒泡较小元素,维持最小堆性质
架构特色
线程安全策略
- 单一主锁:所有公共操作都通过
ReentrantLock
串行化 - 扩容优化:扩容时释放主锁 + CAS自旋锁,提高并发性能
- 条件等待:空队列时
take()
操作通过notEmpty.await()
阻塞
性能设计
- 无界队列:
remainingCapacity()
始终返回Integer.MAX_VALUE
- 批量操作:
drainTo()
直接调用dequeue()
,避免重复加锁 - 快照迭代器:
iterator()
基于数组副本,弱一致性遍历
这种设计使得PriorityBlockingQueue
在保证线程安全的同时,通过巧妙的双锁机制和堆算法优化,实现了较高的并发性能。