当前位置: 首页 > news >正文

java并发编程 PriorityBlockingQueue详解

文章目录

  • 1 PriorityBlockingQueue是什么
  • 2 核心属性详解
  • 3 核心方法详解
    • 3.1 offer(E e)
    • 3.2 poll()
    • 3.3 take()
    • 3.4 peek()
  • 4 总结


1 PriorityBlockingQueue是什么

PriorityBlockingQueue类上的注释描述:一个无界阻塞队列,它使用与类PriorityQueue相同的排序规则,并提供阻塞检索操作。
PriorityQueue又是什么:基于优先级的 堆 的无限制优先级队列,PriorityQueue是一个小顶堆。

2 核心属性详解

利用数组实现小顶堆,利用ReentrantLock 保证元素插入和移除的线程安全。同时因为是无界队列,所以需要扩容机制,此时引入遍历allocationSpinLock 对他unsafe的cas操作,表示谁去扩容。

	//元素存放的集合容器,堆结构也是个数组,所以需要数组集合private transient Object[] queue;//元素的数量private transient int size;//指定的元素的比较规则private transient Comparator<? super E> comparator;//保证线程安全的锁private final ReentrantLock lock;//当获取元素的线程因为集合中无元素而阻塞,会使用该等待条件去实现private final Condition notEmpty;/*** 扩容时候使用的cas锁,因为扩容要保证线程安全,数组扩容是要new一个新数组的*/private transient volatile int allocationSpinLock;//序列化和反序列化使用的private PriorityQueue<E> q;

3 核心方法详解

3.1 offer(E e)

put操作一样,因为无界队列,所以没有存在容量满了,阻塞等待获取元素的线程唤醒
整体逻辑就是元素放入堆中,如果容量不够就进行扩容,ReentrantLock 保证了这些操作是线程安全的

    public boolean offer(E e) {if (e == null)throw new NullPointerException();//获取锁final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果当前元素数量已经达到了数组的长度上限,则需要扩容while ((n = size) >= (cap = (array = queue).length))//对当前数组进行扩容 扩容代码下面有解释tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果没指定比较器,就代表你的类实现了Comparable接口if (cmp == null)//添加元素到堆里 堆排序算法siftUpComparable(n, e, array);else//使用指定的比较器进行入堆siftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//扩容代码描述private void tryGrow(Object[] array, int oldCap) {//先释放锁,因为当前线程需要干扩容的活,不需要阻塞别的线程,这样可能别的线程执行到这扩容已经好了,//就可以执行if (newArray != null && queue == array) ture成立的条件了lock.unlock();Object[] newArray = null;//通过扩容锁,减小锁的粒度,只有一个线程能去开辟新的数组if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {//当前数组长度小于64的时候长度加2,否则长度翻倍int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));//长度到达上限抛异常                      if (newCap - MAX_ARRAY_SIZE > 0) {// possible overflow//MAX_ARRAY_SIZE 是int最大值减8 此时还能慢慢的加,不过一般来说都会OOM了int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}//if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}//此时可能线程没拿到扩容锁,且newArray = new Object[newCap];还没执行到if (newArray == null)Thread.yield();//再次获取主锁lock.lock();//此时两种情况//1.到这newArray = new Object[newCap];还没执行到,此时newArray == null,queue还没被替换,所以该方法结束之后,循环又回来了,释放锁。。。再次争抢锁//2.newArray = new Object[newCap];已执行,此时因为获取的是主锁,只有一个线程能执行底下的queue = newArray;和数组copy操作//这样,其他线程获取到锁之后就会往新的数组中添加元素了if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

如果队列中数组元素特别多,此时扩容一次需要的时间就会相对增加,其他线程阻塞的时间就会边长。

3.2 poll()

获取一个元素。

    public E poll() {//获取锁final ReentrantLock lock = this.lock;lock.lock();try {//拿出堆顶元素 return dequeue();} finally {lock.unlock();}}private E dequeue() {//n是当前元素结合中左后一个元素int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//堆顶就是0下标E result = (E) array[0];E x = (E) array[n];array[n] = null;//堆顶被取出,此时把末尾元素拿上来去重新比较排序保证堆的完整性Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}

3.3 take()

相对于poll 多了等待操作

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)//区别在这notEmpty.await();} finally {lock.unlock();}return result;
}

3.4 peek()

获取堆顶元素,但不移除

 public E peek() {//获取锁final ReentrantLock lock = this.lock;lock.lock();try {//返回堆顶元素return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}

4 总结

PriorityBlockingQueue是一个小顶堆的数据结构的类,使用了ReentrantLock来保证线程安全。可以通过传入的比较器去自定义小顶堆的比较规则,或者实现Comparable接口。

http://www.lryc.cn/news/152277.html

相关文章:

  • SpringMVC_基本使用
  • 大屏开发,浏览器的可视区域和设备的分辨率
  • 【微服务部署】06-日志集成
  • 【Python】python使用docxtpl生成word模板
  • C++学习笔记总结练习:多态与虚函数
  • linux 批量更改指定后辍文件的可执行权限
  • 数据结构(Java实现)-Map和Set
  • C++进程、线程、内存管理
  • 打车系统网约车系统开发支持APP公众号H5小程序版本源码
  • HTTP/1.1协议的状态码
  • SpringCloud(十)——ElasticSearch简单了解(一)初识ElasticSearch和RestClient
  • CAD文字显示?问号解决
  • Calico切换网络模式无效
  • 数据生成 | MATLAB实现GAN生成对抗网络结合SVM支持向量机的数据生成
  • iOS - 资源按需加载 - ODR
  • arduino仿真 SimulIDE1.0仿真器
  • vue实现导出excel的多种方式
  • redis实战-实现优惠券秒杀解决超卖问题
  • C语言:截断+整型提升+算数转换练习
  • Java后端开发面试题——多线程
  • Redis 学习笔记
  • 华为云新生代开发者招募
  • DockerFile简明教程
  • Cygwin是什么?是Windows还是Linux?
  • 成集云 | 多维表格自动化管理jira Server项目 | 解决方案
  • 数据结构(Java实现)-排序
  • C++------vector【STL】
  • Matlab(变量与文本读取)
  • WebGPU学习(8)---使用RenderBundle
  • 【前端】常用功能合集