当前位置: 首页 > news >正文

【JUC系列-13】深入理解DelayQueue延迟队列的底层原理

JUC系列整体栏目


内容链接地址
【一】深入理解JMM内存模型的底层实现原理https://zhenghuisheng.blog.csdn.net/article/details/132400429
【二】深入理解CAS底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132478786
【三】熟练掌握Atomic原子系列基本使用https://blog.csdn.net/zhenghuishengq/article/details/132543379
【四】精通Synchronized底层的实现原理https://blog.csdn.net/zhenghuishengq/article/details/132740980
【五】通过源码分析AQS和ReentrantLock的底层原理https://blog.csdn.net/zhenghuishengq/article/details/132857564
【六】深入理解Semaphore底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132908068
【七】深入理解CountDownLatch底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133343440
【八】深入理解CyclicBarrier底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133378623
【九】深入理解ReentrantReadWriteLock 读写锁的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133629550
【十】深入理解ArrayBlockingQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133692023
【十一】深入理解LinkedBlockingQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133723652
【十二】深入理解PriorityQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133788655
【十三】深入理解DelayQueue的基本使用和底层实现https://blog.csdn.net/zhenghuishengq/article/details/133820599

深入理解DelayQueue延迟队列的底层原理

  • 一,深入理解DelayQueue延迟队列
    • 1,DelayQueue的基本使用
    • 2,DelayQueue的底层源码分析
      • 2.1,DelayQueue类属性
      • 2.2,入队offer方法
      • 2.3,出队take方法
    • 3,总结

一,深入理解DelayQueue延迟队列

延时队列,顾名思义,就是可以实现在一段时间之后在执行这个任务。在分布式场景下可能会更加的选择使用MQ来完成这些操作,但是在单JVM进程中,或者在mq挂了的兜底方案中,会考虑使用这个DelayQueue来完成这个延时任务的。如一些订单超时未支付,任务超时管理,短信异步通知等情况,就可以使用这个延时队列来完成了。

在了解这个DelayQueue延迟队列之前,需要先熟悉上一篇PriorityQueue的基本使用和底层原理,因为这个延迟队列的底层的数据结构,就是通过这个优先级队列来实现的

class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>{//组合了一个优先级队列private final PriorityQueue<E> q = new PriorityQueue<E>();
}

由于这个优先级队列采用的是二叉堆的数据结构,并且采用的是小顶堆的数据结构,因此很容易猜出这个DelayQueue的底层原理了,就是假设5个延时任务,会将最近到期的这个任务排在阻塞队列的前面,因此在出队的时候,就可以保证先过期的先出队。

由于底层是通过这个PriorityQueue的优先级队列实现的,因此这个DelayQueue也是一个无界的阻塞队列,在使用这个延迟队列时,需要实现一个Delayed 的接口。总而言之就是:不保证先进先出,下一个即将过期的任务会排到队列的最前面

1,DelayQueue的基本使用

由于在实际开发中,会有这种订单超时的场景,因此这里主要是模拟一个订单的超时任务,来体验一下这个DelayQueue的基本使用

首先创建一个实现了Delayed接口的OrderDelay订单延时类,Delayed也是Comparable类的一个具体实现

/*** Delayed的具体的方法实现* @Author: zhenghuisheng* @Date: 2023/10/14 0:32*/
@Data
public class OrderDelay implements Delayed {//需要延迟的时间private long delayTime;//订单idprivate Integer orderId;//商品名称private String productName;//构造方法public OrderDelay(long delayTime,Integer orderId,String productName){//需要延迟的 时间 + 当前系统的时间this.delayTime = delayTime + System.currentTimeMillis();this.orderId = orderId;this.productName = productName;}//获取剩余的延时时间@Overridepublic long getDelay(TimeUnit unit) {//到达时间 - 剩余时间long residueTime = this.delayTime - System.currentTimeMillis();return unit.convert(residueTime,TimeUnit.MILLISECONDS);}//实现这个比较器方法@Overridepublic int compareTo(Delayed o) {OrderDelay orderDelay = (OrderDelay)o;return orderDelay.delayTime > this.delayTime ? - 1 : 1;}
}

随后创建一个生产者Producer线程任务类,用于将为支付的订单加入到这个延时队列中

@Data
public class Producer implements Runnable {//全局的阻塞队列private DelayQueue queue;//延迟队列订单类对象private  OrderDelay orderDelay;public Producer(DelayQueue queue,OrderDelay orderDelay){this.queue = queue;this.orderDelay = orderDelay;}//添加文件@Overridepublic void run() {try {queue.put(orderDelay);	//加入阻塞队列System.out.println(orderDelay.getProductName() + "加入完毕...");} catch (Exception e) {e.printStackTrace();}}
}

随后创建一个消费者Consumer线程任务类,用于取出即将过期的订单任务

/*** 消费者线程* @Author: zhenghuisheng* @Date: 2023/10/8 20:21*/
@Data
public class Consumer implements Runnable {private DelayQueue queue;public Consumer(DelayQueue delayQueue){this.queue = delayQueue;}@Overridepublic void run() {try {System.out.println(queue.take());} catch (InterruptedException e) {e.printStackTrace();}}
}

然后再创建一个线程池的工具类,用于更好的监控和管理线程

/*** 线程池工具* @author zhenghuisheng* @date : 2023/3/22*/
public class ThreadPoolUtil {/*** io密集型:最大核心线程数为2N,可以给cpu更好的轮换,*           核心线程数不超过2N即可,可以适当留点空间* cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,*           核心线程数不超过N+1即可* 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大*/public static synchronized ThreadPoolExecutor getThreadPool() {if (pool == null) {//获取当前机器的cpuint cpuNum = Runtime.getRuntime().availableProcessors();log.info("当前机器的cpu的个数为:" + cpuNum);int maximumPoolSize = cpuNum * 2 ;pool = new ThreadPoolExecutor(maximumPoolSize - 2,maximumPoolSize,5L,   //5sTimeUnit.SECONDS,new LinkedBlockingQueue<>(),  //数组有界队列Executors.defaultThreadFactory(), //默认的线程工厂new ThreadPoolExecutor.AbortPolicy());  //直接抛异常,默认异常}return pool;}
}

最后创建一个有Main方法的测试类,用于对这个DelayQueue进行测试

/*** @Author: zhenghuisheng* @Date: 2023/10/14 1:41*/
public class DelayQueueDemo {//创建一个线程池static ThreadPoolExecutor pool = ThreadPoolUtil.getThreadPool();//创建一个全局的延迟队列static DelayQueue<OrderDelay> delayQueue = new DelayQueue();public static void main(String[] args) throws Exception {//生产者创建任务for (int i = 7; i > 2; i--) {OrderDelay orderDelay = new OrderDelay(i * 1000, i, "id_" + i);//创建生产者线程Producer producerTask = new Producer(delayQueue, orderDelay);//提交到线程池pool.execute(producerTask);}Thread.sleep(50);System.out.println("====生产者线程创建完毕====");//创建消费者线程for (int i = 0; i < 5; i++) {Consumer consumerTask = new Consumer(delayQueue);pool.execute(consumerTask);}}
}

最后看执行结果,先进来但是延迟时间长,所以后出去

id_7加入完毕…
id_5加入完毕…
id_6加入完毕…
id_4加入完毕…
id_3加入完毕…
生产者线程创建完毕
OrderDelay(delayTime=1697221724133, orderId=3, productName=id_3)
OrderDelay(delayTime=1697221725133, orderId=4, productName=id_4)
OrderDelay(delayTime=1697221726132, orderId=5, productName=id_5)
OrderDelay(delayTime=1697221727132, orderId=6, productName=id_6)
OrderDelay(delayTime=1697221728129, orderId=7, productName=id_7)

2,DelayQueue的底层源码分析

2.1,DelayQueue类属性

首先查看这个 DelayQueue 类,也是继承了这个抽象类,也是实现了这个BlockingQueue

class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>

在这个类中首先最重要的就是这个PriorityQueue优先级队列,说明这个延迟队列的底层是通过这个优先级队列实现的

//组合了一个优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();

随后就是一把互斥锁加一个条件队列组成,互斥锁就是offer方法和take方法的互斥,然后这个条件队列是在队列为空时存储这个线程结点的

private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

还有一个重要的属性,就是一个leader的线程标记,用对记录队头的线程,谁最早过期就记录谁

private Thread leader = null;

最后来看看该类的构造方法,里面是空的,因为里面的offer和take主要是操作这个PriorityQueue类

public DelayQueue() {}

2.2,入队offer方法

接下来直接看这个类的offer方法的具体实现,这下面的逻辑是比较简单的,就是先入队,如果是第一个元素入队,那么回去唤醒条件队列中被阻塞的结点,因为这些结点是队列为空而将线程阻塞的,现在队列已经不为空了

public boolean offer(E e) {final ReentrantLock lock = this.lock;	//获取这把互斥锁lock.lock();		//加锁try {q.offer(e);		//线程入队if (q.peek() == e) {	//如果堆顶为当前元素,表示第一个元素入队leader = null;available.signal();	//那么就会去唤醒因对列为空而被阻塞的线程结点}return true;} finally {lock.unlock();	//解锁}
}

随后依旧是进入上面的offer方法,做一个具体的入队操作,这里需要结合PriorityQueue的属性来看,首先会判断这个数组是否达到设置的最大值或者扩容后的最大值,如果是,则继续扩容

public boolean offer(E e) {if (e == null)	throw new NullPointerException(); 	//结点为空modCount++;	int i = size;if (i >= queue.length)	//达到最大值grow(i + 1);	//扩容size = i + 1;		if (i == 0) queue[0] = e;	//队列为空则直接加入堆顶else	siftUp(i, e);	//否则上浮,堆算法return true;
}

数组扩容的方法如下,先做一个扩容操作,并且最后创建一个新数组,将旧值copy到新数组中,随后将新数组返回假设此时的容量小于64,则扩大原来的容量+2,如果大于64,则扩大原来的容量一倍。

就是说假设此时容量为16,那么第一次扩容就是 16+16+2为34,第二次扩容为34 + 34 + 2为70,第三次扩容为70 + 70*2 = 210

private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ?(oldCapacity + 2) :(oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);queue = Arrays.copyOf(queue, newCapacity);
}

如果此时不是第一个结点入队,那么就会调用这个 siftUp 方法,如果有自定义实现的比较器,则用自定义的,否则则直接使用内部默认的比较器

private void siftUp(int k, E x) {if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);
}

接着直接来看内部默认实现的这个上浮的方法吧,就是一个小顶堆的入队操作

private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;	//创建一个比较构造器while (k > 0) {	//队列的元素值int parent = (k - 1) >>> 1;	//获取当前结点的父节点的索引,左移一位即可Object e = array[parent];	//根据索引下标取值if (key.compareTo((T) e) >= 0)	//比较和交换,如果当前值大于父节点则不动break;array[k] = e;	//如果当前结点的值小于父结点,则将当前结点改成父结点的值(默认使用的是小顶堆)k = parent;		//k在这个while循环下一定会等于0,因此会走最下面的赋值,就是不断地通过while循环将最小的交换到最上面}array[k] = key;	//如果队列的长度为0,则直接将堆顶元素赋值
}

在成功入队之后,最后会调用这个unlock方法,用于解锁,并且唤醒被阻塞的结点

lock.unlock();

2.3,出队take方法

在结点入队之后,那么接下来就看这个结点出队的方法,出队方法相对来说是稍微多一点的。首先出队第一个头结点,如果已经过期则直接出队,否者获取这个即将过期的时间延迟阻塞,即阻塞到到一定的时间主动唤醒,最后执行这个任务,会在这个for自旋中,可以保证所有的结点出队。并且通过一个临时变量 leader,只需获取最早过期的结点进行阻塞,从而不需关心比该结点更晚过期的结点,从而减少阻塞的数量。

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {	//自旋E first = q.peek();	//头结点出队if (first == null)	//如果队列为空available.await();	//则加入条件队列阻塞else {//获取头结点的过期时间long delay = first.getDelay(NANOSECONDS);	//头结点的过期时间小于0,说明已经过期。则直接出队if (delay <= 0)	return q.poll();first = null; // don't retain ref while waiting//特别说明,这个leader就是用于记录最早过期的那个线程if (leader != null)	//如果已经存在记录的最近过期的结点available.await();	//则阻塞else {Thread thisThread = Thread.currentThread();leader = thisThread;try {//延时阻塞,阻塞到一定时间主动唤醒available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)	available.signal();		//优化,主动唤醒lock.unlock();	//解锁}
}

最后会通过unlock进行一个解锁操作。

lock.unlock();

3,总结

延迟队列的底层是通过这个优先级队列来实现的,越早过期的结点越先出队,内部也是采用ReentrantLock+条件队列来实现安全问题以及性能问题。延迟队列的结构也是无界队列形成的数组,在入队的结点元素需要时Delayed类的具体实现。

http://www.lryc.cn/news/192004.html

相关文章:

  • Leetcode---365周赛
  • Java使用opencv实现人脸识别、人脸比对
  • Redis HyperLogLog的使用
  • Apisix-Ingress服务发现详解
  • spring6-事务
  • JavaFx学习问题2--音频、视频播放失败情况
  • 第55节—— redux-toolkit中的createReducer——了解
  • JUC并发编程——JUC并发编程概述及Lock锁(重点)(基于狂神说的学习笔记)
  • 深入了解 Java 中的时间信息定义、转换、比较和操作
  • 2023年中国智能矿山发展历程及趋势分析:智能矿山健康有序发展[图]
  • acwing算法基础之基础算法--整数离散化算法
  • 基于SSM框架的安全教育平台
  • Kafka生产者使用案例
  • EasyX图形库实现贪吃蛇游戏
  • 利用 Amazon CodeWhisperer 激发孩子的编程兴趣
  • 2023年中国分子筛稀土催化材料竞争格局及行业市场规模分析[图]
  • vue3插件——vue-web-screen-shot——实现页面截图功能
  • 简单总结Centos7安装Tomcat10.0版本
  • ffmpeg中AVCodecContext和AVCodec的关系分析
  • 2023年中国门把手产量、销量及市场规模分析[图]
  • HTML 核心技术点基础详细解析以及综合小案例
  • BAT学习——批处理脚本(也称为BAT文件)常用语法元素与命令
  • AMD AFMF不但能用在游戏,也适用于视频
  • CSS 常用样式浮动属性
  • Linux引导故障排除:从问题到解决方案的详细指南
  • 【vim 学习系列文章 6 -- vim 如何从上次退出的位置打开文件】
  • 怎样学习C#上位机编程?
  • 【算法-动态规划】两个字符串的删除操作-力扣 583
  • 【06】基础知识:typescript中的泛型
  • flutter 绘制原理探究