当前位置: 首页 > news >正文

【JUC并发编程】ArrayBlockingQueue和LinkedBlockingQueue源码2分钟看完

文章目录

  • 1、BlockingQueue
    • 1)接口方法
    • 2)阻塞队列分类
  • 2、ArrayBlockingQueue
    • 1)构造函数
    • 2)put()入队
    • 3)take()出队
  • 3、LinkedBlockingQueue
    • 1)构造函数
    • 2)put()入队
    • 3)take()出队

1、BlockingQueue

BlockingQueue是JUC包下提供的一个阻塞队列 接口;

1)接口方法

在这里插入图片描述

队列操作

  • 抛出异常:add(e)、remove()、element()
  • 返回特定值:offer()队尾入队/poll()删除队头元素/peek()
  • 一直阻塞:put(e)/take()
  • 超时退出:offer(e,time,unit)/poll(time,unit)

其中:BlockingQueue 不接受 null 元素。试图 add 、 put 或 offer ⼀个 null 元素时,某些实现会抛出 NullPointerException 。

在这里插入图片描述

2)阻塞队列分类

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue:由链表组成的有界阻塞队列。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列(默认升序排序)。

  • DelayQueue:支持延时获取元素的无界阻塞队列。

    队列使用PriorityQueue来实现。

  • SynchronousQueue:一个不存储元素的阻塞队列。

  • LinkedTransferQueue:由链表组成的无界阻塞队列。

    其多了tryTransfer()方法和transfer()方法。

    • transfer():可以把生产者传入的元素立刻传输给消费者。如果没有consumer在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
    • tryTransfer():用来试探生产者传入的元素是否能直接传给消费者。不管是否有consumer正在等待接收元素,都立刻返回。

2、ArrayBlockingQueue

ArrayBlockingQueue是由数组结构组成的有界阻塞队列。通过ReentrantLock保证线程安全、并实现 Producer-Consumer模式。

1)构造函数

在这里插入图片描述

从构造函数可知,默认采用非公平锁;

public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();// 底层使用对象数组保存元素this.items = new Object[capacity];// 初始化需要加锁使用的ReentrantLock实例,默认采用非公平锁lock = new ReentrantLock(fair);/*** 判断队列是 空 or 满*     notEmpty⽤于执⾏take时进⾏await()等待操作,put时进⾏signal()唤醒操作*     notFull⽤于执⾏take时进⾏signal()唤醒操作,put时进⾏await()等待操作*/notEmpty = lock.newCondition();notFull =  lock.newCondition();
}

方法释义:

  • 构造函数的入参capacity指定了底层存储元素数组⻓度的⼤⼩;
  • 初始化需要加锁使⽤的ReentrantLock实例,默认采⽤的是⾮公平锁;
  • 基于Lock的Condition判断队列是 空 or 满
    • notEmpty⽤于执⾏take时进⾏await()等待操作,put时进⾏signal()唤醒操作;
    • notFull⽤于执⾏take时进⾏signal()唤醒操作,put时进⾏await()等待操作;

2)put()入队

public void put(E e) throws InterruptedException {// 入参不能为空checkNotNull(e);final ReentrantLock lock = this.lock;// 加可中断锁lock.lockInterruptibly();try {while (count == items.length)// 队列满了,则阻塞等待signal唤醒,同时释放次有的锁。notFull.await();// 入队操作enqueue(e);} finally {lock.unlock();}
}private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;// 如果数据已经插入到数组末尾,则重置putIndex为0,从0开始继续插入。if (++putIndex == items.length)putIndex = 0;count++;// 通知take线程解除阻塞notEmpty.signal();
}

方法释义:

  • ⾸先尝试获得可中断锁,即:lock.lockInterruptibly(),当执⾏interrupt操作时,该锁可以被中断。
  • 如果数组中元素的个数(count)等于数组的⻓度了,说明队列已经满了,在该线程上执⾏等待操作:notFull.await(); ,等待signal唤醒。
  • 如果队列没有满,则调⽤enqueue(e)⽅法执⾏⼊列操作;
    1. ⼊列操作⾸先会将待插⼊值x放⼊数组下标为putIndex的位置上,然后再将putIndex加1,来指向下⼀次插⼊的下标位置。
      • 如果加1后的putIndex等于了数组的⻓度,那么说明已经越界了(因为putIndex是从0开始的);做循环式插⼊,重置putIndex为0,从0开始继续插入。
  • 最后,执⾏count++来计算元素总个数;并调⽤notEmpty.signal()⽅法来解除阻塞;
    • 当队列为空的时候,执⾏take⽅法会被notEmpty.await()阻塞;
    • 此处就是通知take线程解除阻塞

3)take()出队

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)// 如果队列为空,则线程阻塞 等待signal唤醒,释放持有的锁notEmpty.await();// 执行出队操作return dequeue();} finally {lock.unlock();}
}private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")// 获取takeIndex下标的元素E x = (E) items[takeIndex];// 将takeIndex下标下的袁术置为null,便于后面GC回收items[takeIndex] = null;// 如果出队操作的到了数组末尾,则重置takeIndex,从0开始继续取出if (++takeIndex == items.length)takeIndex = 0;count--;// 默认itrs为null,不会走进if代码段。if (itrs != null)itrs.elementDequeued();// 通知put线程解除阻塞notFull.signal();return x;
}

方法释义:

  • take⽅法和put⽅法类似,区别是出队的指针是takeIndex;

  • ⾸先尝试获得可中断锁,即:lock.lockInterruptibly(),当执⾏interrupt操作时,该锁可以被中断。

  • 如果队列中为空;执⾏notEmpty.await()将线程阻塞 等待signal唤醒,释放持有的锁。

    • 当调⽤put⽅法向队列中放⼊元素之后 ,会调⽤notEmpty.signal⽅法对等待的线程执⾏唤醒操作;
  • 如果出队操作的到了数组末尾,则重置takeIndex,从0开始继续取出;

  • 出队执⾏完毕后,调⽤notFull.signal⽅法来唤醒在notFull上⾯

    await的线程。 通知put线程解除阻塞。

3、LinkedBlockingQueue

LinkedBlockingQueue是由链表结构组成的有界阻塞队列。通过ReentrantLock保证线程安全、并实现 Producer-Consumer模式。

1)构造函数

如果不指定容量,则默认LinkedBlockingQueue是无界阻塞队列(capacity = Integer.MAX_VALUE)

在这里插入图片描述

构造函数中会创建⼀个空的节点,作为整个链表的头节点。

2)put()入队

在这里插入图片描述

private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;
}

整个put()流程和ArrayBlockingQueue基本一致,对于链表容量的统计会额外采用一个AtomiceInteger类型的变量count维护。

    • 最后唤醒put()线程的代码段上有一个 c == 0的判断,这里的c是入队操作之前的数量。
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

这里有个比较有意思的点:

  • 入队操作添加完元素之后,如果发现当前队列的元素数量还没到最大容量,则尝试唤醒其他put()操作阻塞的线程;
if (c + 1 < capacity)notFull.signal();

3)take()出队

在这里插入图片描述

private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
}

整个take()流程和ArrayBlockingQueue基本一致,稍微看一下即可。

  • 最后唤醒take()线程的代码段上有一个 c == capacity的判断,这里的c是出队操作之前的数量。

和LinkedBlockingQueue的put()操作一样:

  • 出队操作移除完元素之后,如果发现当前队列的元素数量 > 1,则尝试唤醒其他take()操作阻塞的线程;
if (c > 1)notEmpty.signal();
http://www.lryc.cn/news/3919.html

相关文章:

  • GitHub个人资料自述与管理主题设置
  • Express篇-连接mysql
  • win10 安装rabbitMQ详细步骤
  • 【成为架构师课程系列】一线架构师:6个经典困惑及其解法
  • 光耦合器的定义与概述
  • 谷粒商城--品牌管理详情
  • stack、queue和priority_queue
  • 面试题(二十二)消息队列与搜索引擎
  • Spring Security in Action 第三章 SpringSecurity管理用户
  • Java面试——maven篇
  • 基于微信小程序的游戏账号交易小程序
  • Matlab绘制隐函数总结-二维和三维
  • 如何直观地理解傅立叶变换?频域和时域的理解
  • STC15读取内部ID示例程序
  • Xml格式化与高亮显示
  • 【GlobalMapper精品教程】045:空间分析工具(2)——相交
  • 4年外包终上岸,我只能说这类公司能不去就不去..
  • sklearn降维算法1 - 降维思想与PCA实现
  • 「期末复习」线性代数
  • 伏并网低电压穿越技术
  • opencv的环境搭建
  • C++智能指针
  • MongoDB--》MongoDB数据库以及可视化工具的安装与使用—保姆级教程
  • JAVA 基础题
  • Flutter desktop端多屏幕展示问题处理
  • 每天10个前端小知识 【Day 9】
  • Elasticsearch的读写搜索过程
  • 线上服务质量的问题该如何去处理?你有什么思路?
  • IOC 配置,依赖注入的三种方式
  • 自动机,即有限状态机