java并发编程 PriorityBlockingQueue详解
文章目录
- 1 PriorityBlockingQueue是什么
- 2 核心属性详解
- 3 核心方法详解
- 3.1 offer(E e)
- 3.2 poll()
- 3.3 take()
- 3.4 peek()
- 4 总结
1 PriorityBlockingQueue是什么
PriorityBlockingQueue类上的注释描述:一个无界阻塞队列,它使用与类PriorityQueue相同的排序规则,并提供阻塞检索操作。
PriorityQueue又是什么:基于优先级的 堆 的无限制优先级队列,PriorityQueue是一个小顶堆。
2 核心属性详解
利用数组实现小顶堆,利用ReentrantLock 保证元素插入和移除的线程安全。同时因为是无界队列,所以需要扩容机制,此时引入遍历allocationSpinLock 对他unsafe的cas操作,表示谁去扩容。
//元素存放的集合容器,堆结构也是个数组,所以需要数组集合private transient Object[] queue;//元素的数量private transient int size;//指定的元素的比较规则private transient Comparator<? super E> comparator;//保证线程安全的锁private final ReentrantLock lock;//当获取元素的线程因为集合中无元素而阻塞,会使用该等待条件去实现private final Condition notEmpty;/*** 扩容时候使用的cas锁,因为扩容要保证线程安全,数组扩容是要new一个新数组的*/private transient volatile int allocationSpinLock;//序列化和反序列化使用的private PriorityQueue<E> q;
3 核心方法详解
3.1 offer(E e)
put操作一样,因为无界队列,所以没有存在容量满了,阻塞等待获取元素的线程唤醒
整体逻辑就是元素放入堆中,如果容量不够就进行扩容,ReentrantLock 保证了这些操作是线程安全的
public boolean offer(E e) {if (e == null)throw new NullPointerException();//获取锁final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果当前元素数量已经达到了数组的长度上限,则需要扩容while ((n = size) >= (cap = (array = queue).length))//对当前数组进行扩容 扩容代码下面有解释tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果没指定比较器,就代表你的类实现了Comparable接口if (cmp == null)//添加元素到堆里 堆排序算法siftUpComparable(n, e, array);else//使用指定的比较器进行入堆siftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//扩容代码描述private void tryGrow(Object[] array, int oldCap) {//先释放锁,因为当前线程需要干扩容的活,不需要阻塞别的线程,这样可能别的线程执行到这扩容已经好了,//就可以执行if (newArray != null && queue == array) ture成立的条件了lock.unlock();Object[] newArray = null;//通过扩容锁,减小锁的粒度,只有一个线程能去开辟新的数组if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {//当前数组长度小于64的时候长度加2,否则长度翻倍int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));//长度到达上限抛异常 if (newCap - MAX_ARRAY_SIZE > 0) {// possible overflow//MAX_ARRAY_SIZE 是int最大值减8 此时还能慢慢的加,不过一般来说都会OOM了int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}//if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}//此时可能线程没拿到扩容锁,且newArray = new Object[newCap];还没执行到if (newArray == null)Thread.yield();//再次获取主锁lock.lock();//此时两种情况//1.到这newArray = new Object[newCap];还没执行到,此时newArray == null,queue还没被替换,所以该方法结束之后,循环又回来了,释放锁。。。再次争抢锁//2.newArray = new Object[newCap];已执行,此时因为获取的是主锁,只有一个线程能执行底下的queue = newArray;和数组copy操作//这样,其他线程获取到锁之后就会往新的数组中添加元素了if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}
如果队列中数组元素特别多,此时扩容一次需要的时间就会相对增加,其他线程阻塞的时间就会边长。
3.2 poll()
获取一个元素。
public E poll() {//获取锁final ReentrantLock lock = this.lock;lock.lock();try {//拿出堆顶元素 return dequeue();} finally {lock.unlock();}}private E dequeue() {//n是当前元素结合中左后一个元素int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//堆顶就是0下标E result = (E) array[0];E x = (E) array[n];array[n] = null;//堆顶被取出,此时把末尾元素拿上来去重新比较排序保证堆的完整性Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
3.3 take()
相对于poll 多了等待操作
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)//区别在这notEmpty.await();} finally {lock.unlock();}return result;
}
3.4 peek()
获取堆顶元素,但不移除
public E peek() {//获取锁final ReentrantLock lock = this.lock;lock.lock();try {//返回堆顶元素return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}
4 总结
PriorityBlockingQueue是一个小顶堆的数据结构的类,使用了ReentrantLock来保证线程安全。可以通过传入的比较器去自定义小顶堆的比较规则,或者实现Comparable接口。