当前位置: 首页 > news >正文

阻塞队列BlockingQueue详解

一、阻塞队列介绍

1、队列

 队列入队从队首开始添加,直至队尾;出队从队首出队,直至队尾,所以入队和出队的顺序是一样的

Queue接口

  • add(E) :在指定队列容量条件下添加元素,若成功返回true,若当前队列没有可用空间抛出IllegalStateException异常
  • offer(E):在指定队列容量条件下添加元素,若成功返回true,若当前队列没有可用空间返回false
  • remove():返回并删除此队列的头部元素,若队列为空会抛出异常
  • poll():返回并删除此队列的头部元素,若队列为空会返回null
  • element():返回头部元素,但不删除,队列为空会抛出异常
  • peek():返回头部元素,但不删除,队列为空返回null

2、阻塞队列

BlockingQueue规范定义了添加和删除阻塞队列的方法,很多阻塞队列都是基于BlockingQueue实现的,具体原理:当阻塞队列插入数据时,如果队列已满,线程会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程会阻塞等待直到队列非空

1)BlockingQueue接口

  • put():将指定元素插入队列,如果必要等待队列空间变为可用
  • take():返回并删除队列中的头部元素,如果必要直到等待某个元素可用
  • offer(E, long, TimeUnit):将指定的元素插入此队列,指定的等待时间等待必要的可用空间
  • poll(long, TimeUnit):返回并删除此队列的头部元素,指定的等待时间,直到等待某个元素可用
2)应用场景
  • 线程池:线程池中线程创建的个数超过核心线程数,会放入到等待队列中,如果队列空了,核心线程又没有要处理的任务,会进入等待,直到队列中有新的任务
  • 生产者-消费者模式:当生产者线程发现队列满了会陷入等待,直到有消费者线程进行消费并唤醒生产者线程;当消费者线程发现队列中没有可处理消息会陷入等待,直到生产者线程进行生产并唤醒消费者线程,阻塞队列可以避免线程间的竞争
  • 消息队列:可以把消息放到队列中,进行消息的异步处理
  • 缓存系统:使用contains()方法判断是否包含某个元素,利用阻塞队列来缓存数据,避免多线程更新缓存的竞争
  • 并发任务处理:将任务提交到队列中,消费之后出队,避免重复消费

3、JUC包下的阻塞队列

二、ArrayBlockingQueue

ArrayBlockingQueue采用Object数组方式存储数据,创建ArrayBlockingQueue必须指定容量大小,属于有界队列,采用ReentrantLock保证线程安全,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择

1、使用

public class ArrayBlockingQueueTest {private static final int QUEUE_CAPACITY = 5;private static final int PRODUCER_DELAY_MS = 1000;private static final int CONSUMER_DELAY_MS = 2000;public static void main(String[] args) throws InterruptedException {// 创建一个容量为QUEUE_CAPACITY的阻塞队列BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);// 创建一个生产者线程Runnable producer = () -> {while (true) {try {// 在队列满时阻塞queue.put("producer");System.out.println("生产了一个元素,队列中元素个数:" + queue.size());Thread.sleep(PRODUCER_DELAY_MS);} catch (InterruptedException e) {e.printStackTrace();}}};new Thread(producer).start();// 创建一个消费者线程Runnable consumer = () -> {while (true) {try {// 在队列为空时阻塞String element = queue.take();System.out.println("消费了一个元素,队列中元素个数:" + queue.size());Thread.sleep(CONSUMER_DELAY_MS);} catch (InterruptedException e) {e.printStackTrace();}}};new Thread(consumer).start();}
}

生产者少休眠1s,生产的快,当生产者添加第六个元素时会陷入等待 

2、源码分析

  • items:数组元素数组
  • takeIndex:下一个待取出元素索引
  • putIndex:下一个待添加元素索引
  • count:元素个数
  • lock:内置锁
  • notEmpty:消费者
  • notFull:生产者
入队详解

https://www.processon.com/view/link/64c8c537b9f7806c73dadbb4

出队详解

https://www.processon.com/view/link/64c8c92fb9f7806c73daea85

为什么ArrayBlockingQueue对数组操作要设计成双指针?

如果用一个指针,对数组的删除或者添加操作,数组中的元素都要往前或者往后移动,这样导致时间复杂度为O(n),而使用双指针可以前移后移,可以提升操作的性能,时间复杂度为O(1)

三、LinkedBlockingQueue

LinkedBlockingQueue是基于链表实现的阻塞队列,队列默认大小为Integer.MAX_VALUE,由于这个数值比较大,LinkedBlockingQueue也被称为无界队列,LinkedBlockingQueue每个元素都会占用内存,为防止OOM还是设置一个队列大小

1、使用

和ArrayBlockingQueue使用基本差不多

  • LinkedBlockingQueue():队列大小为2的32次方减1
  • LinkedBlockingQueue(Collection<? extends E>):队列大小为2的32次方减1,按照传入集合初始化队列数据
  • LinkedBlockingQueue(int):传入参数指定队列大小

2、源码分析

相比ArrayBlockingQueue读写只一把独占锁的实现,LinkedBlockingQueue读写分了两把锁

  •  item:元素存储的数据
  • next:下一个节点,单项链表结构
  • capacity:队列容量
  • count:元素数量
  • head:链表表头
  • last链表表尾
  • takeLock:出队操作竞争的锁对象
  • notEmpty:当队列无元素时,会让进行takeLock的线程陷入等待,直到有线程唤醒
  • putLock:入队操作竞争的锁对象
  • notFull:当队列满了,会让进行putLock的线程陷入等待,直到有线程唤醒

初始化LinkedBlockingQueue对象时,会创建一个属性item为null的Node对象

入队详解

https://www.processon.com/view/link/64c8f6d5b9f7806c73db4fc9

出队详解

https://www.processon.com/view/link/64c8ff0e7807695f1493090f

3、LinkedBlockingQueue和ArrayBlockingQueue对比

  • 队列大小:ArrayBlockingQueue必须指定容量大小,LinkedBlockingQueue可以不指定,LinkedBlockingQueue如果添加比删除快会导致OOM
  • 数组存储容器不同:ArrayBlockingQueue采用数组存储数据,LinkedBlockingQueue采用对象链表方式存储数据;就因为会产生Node对象,并发量大时会对gc产生较大的影响
  • ArrayBlockingQueue添加和删除都是争抢同一个锁资源,LinkedBlockingQueue添加和删除进行了锁分离,LinkedBlockingQueue高并发场景下可以并行的进行入队和出队操作

四、DelayQueue

可以使用队列消息延迟消费,实现接口回调通知、token超时失效、订单超时失效

1、使用

public class DelayQueueTest {public static void main(String[] args) throws InterruptedException {DelayQueue<Order> delayQueue = new DelayQueue<>();delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));while (!delayQueue.isEmpty()) {Order take = delayQueue.take();System.out.println("处理订单:"+take.getOrderId());}}static class Order implements Delayed {private String orderId;private long createTime;private long delayTime;public Order(String orderId, long createTime, long delayTime) {this.orderId = orderId;this.createTime = createTime;this.delayTime = delayTime;}public String getOrderId() {return orderId;}@Overridepublic long getDelay(TimeUnit unit) {// 订单创建时间+延迟时间-当前时间=剩余延迟时间long diff = createTime + delayTime - System.currentTimeMillis();return unit.convert(diff, unit);}@Overridepublic int compareTo(Delayed o) {// 比较两个订单之间差多长时间long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return Long.compare(diff, 0);}}
}

2、源码分析

  • lock:用于保证线程安全
  • q: 优先级队列,存储元素,用于保证延迟低的优先执行
  • leader:用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
  • available:条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
入队详解

https://www.processon.com/view/link/64c91879470d721c4e3be985

出队详解

https://www.processon.com/view/link/64c9185fc1af4746895281e7

五、如何选择适合的阻塞队列

1、选择策略

通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:

功能

第 1 个需要考虑的就是功能层面,比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。

容量

第 2 个需要考虑的是容量,或者说是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。

能否扩容

第 3 个需要考虑的是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。

内存结构

第 4 个需要考虑的点就是内存结构。我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。

性能

第 5 点就是从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。

2、线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。

Executors类下的线程池类型:

  • FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
  • CachedThreadPool 选取的是 SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列
http://www.lryc.cn/news/110611.html

相关文章:

  • pygame贪吃蛇游戏
  • Mac系统下使用远程桌面连接Windows系统
  • 使用 OpenCV 和深度学习对黑白图像进行着色
  • 从价值的角度看,为何 POSE 通证值得长期看好
  • pytorch的CrossEntropyLoss交叉熵损失函数默认reduction是平均值
  • OKR管理策略:为开发团队注入动力
  • C++二叉搜索树剖析
  • 升级你的GitHub终端认证方式:从密码到令牌
  • 【力扣】链表题目总结
  • Thunar配置自定义动作
  • Python 开发工具 Pycharm —— 使用技巧Lv.3
  • 51单片机(普中HC6800-EM3 V3.0)实验例程软件分析 实验三 LED流水灯
  • 深度学习与计算机相结合:直播实时美颜SDK的创新之路
  • Unity寻找子物体的方法
  • 车载软件架构 —— 车载软件安全启动关键技术解读
  • 2023-08-05——JVM Method Area(方法区)
  • 【前端知识】React 基础巩固(四十六)——自定义Hook的应用
  • Swish - Mac 触控板手势窗口管理工具[macOS]
  • 【雕爷学编程】MicroPython动手做(31)——物联网之Easy IoT 2
  • C# 简单模拟 程序内部 消息订阅发布功能
  • 第六章 支持向量机
  • Docker基本操作之删除容器Container和删除镜像IMAGE
  • vue 3.0 + element-ui MessageBox弹出框的 让文本框显示文字 placeholder
  • QT生成可执行文件的步骤
  • 一分钟学会JS获取当前年近五年的年份
  • 14 springboot项目——首页跳转实现
  • IL汇编语言读取控制台输入和转换为整数
  • 什么是跨链 DeFi?
  • Linux下C/C++的gdb工具与Python的pdb工具常见用法之对比
  • 从入门到专业:探索Python中的判断与循环技巧!