前言

PriorityBlockingQueue 優(yōu)先級隊列,線程安全(添加、讀取都進行了加鎖)、無界、讀阻塞的隊列,底層采用的堆結構實現(xiàn)(二叉樹),默認是小根堆,最小的或者最大的元素會一直置頂,每次獲取都取最頂端的數(shù)據(jù)
隊列創(chuàng)建
小根堆
PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>();
大根堆
PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
應用場景
有任務要執(zhí)行,可以對任務加一個優(yōu)先級的權重,這樣隊列會識別出來,對該任務優(yōu)先進行出隊。
我們來看一個具體例子,例子中定義了一個將要放入“優(yōu)先阻塞隊列”的任務類,并且定義了一個任務工場類和一個任務執(zhí)行類,在任務工場類中產(chǎn)生了各種不同優(yōu)先級的任務,將其添加到隊列中,在任務執(zhí)行類中,任務被一個個取出并執(zhí)行。
package com.niuh.queue.priority;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* <p>
* PriorityBlockingQueue使用示例
* </p>
*/
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random random = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec)); // 這里需要注意,往PriorityBlockingQueue中添加任務和取出任務的
exec.execute(new PrioritizedTaskConsumer(queue)); // 步驟是同時進行的,因而輸出結果并不一定是有序的
}
}
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random random = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(PrioritizedTask o) {
return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0); // 定義優(yōu)先級計算方式
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
} catch (InterruptedException e) {
}
System.out.println(this);
}
@Override
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ": " + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService exec) {
super(-1);
this.exec = exec;
}
@Override
public void run() {
int count = 0;
for (PrioritizedTask pt : sequence) {
System.out.print(pt.summary());
if (++count % 5 == 0) {
System.out.println();
}
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random random = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
this.queue = queue;
this.exec = exec;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加隨機優(yōu)先級的任務
Thread.yield();
}
try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加優(yōu)先級為10的任務
}
for (int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加優(yōu)先級為1-10的任務
}
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> queue;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
queue.take().run(); // 任務的消費者,從PriorityBlockingQueue中取出任務執(zhí)行
}
} catch (InterruptedException e) {
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
工作原理
PriorityBlockingQueue 是 JDK1.5 的時候出來的一個阻塞隊列。但是該隊列入隊的時候是不會阻塞的,永遠會加到隊尾。下面我們介紹下它的幾個特點:
- PriorityBlockingQueue 和 ArrayBlockingQueue 一樣是基于數(shù)組實現(xiàn)的,但后者在初始化時需要指定長度,前者默認長度是 11。
- 該隊列可以說是真正的無界隊列,它在隊列滿的時候會進行擴容,而前面說的無界阻塞隊列其實都有有界,只是界限太大可以忽略(最大值是 2147483647)
- 該隊列屬于權重隊列,可以理解為它可以進行排序,但是排序不是從小到大排或從大到小排,是基于數(shù)組的堆結構(具體如何排下面會進行分析)
- 出隊方式和前面的也不同,是根據(jù)權重來進行出隊,和前面所說隊列中那種先進先出或者先進后出方式不同。
- 其存入的元素必須實現(xiàn)Comparator,或者在創(chuàng)建隊列的時候自定義Comparator。
注意:
- 堆結構實際上是一種完全二叉樹。關于二叉樹可以查看 《樹、二叉樹、二叉搜索樹的實現(xiàn)和特性》
- 堆又分為大頂堆和小頂堆 。大頂堆中第一個元素肯定是所有元素中最大的,小頂堆中第一個元素是所有元素中最小的。關于二叉堆可以查看《堆和二叉堆的實現(xiàn)和特性》
源碼分析
定義
PriorityBlockingQueue的類繼承關系如下:
其包含的方法定義如下:
成員屬性
從下面的字段我們可以知道,該隊列可以排序,使用顯示鎖來保證操作的原子性,在空隊列時,出隊線程會堵塞等。
/**
* 默認數(shù)組長度
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 最大達容量,分配時超出可能會出現(xiàn) OutOfMemoryError 異常
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 隊列,存儲我們的元素
*/
private transient Object[] queue;
/**
* 隊列長度
*/
private transient int size;
/**
* 比較器,入隊進行權重的比較
*/
private transient Comparator<? super E> comparator;
/**
* 顯示鎖
*/
private final ReentrantLock lock;
/**
* 空隊列時進行線程阻塞的 Condition 對象
*/
private final Condition notEmpty;
構造函數(shù)
/**
* 默認構造,使用長度為 11 的數(shù)組,比較器為空
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 自定義數(shù)據(jù)長度構造,比較器為空
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
/**
* 自定義數(shù)組長度,可以自定義比較器
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
/**
* 構造函數(shù),帶有初始內容的隊列
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
入隊方法
入隊方法,下面可以看到 put 方法最終會調用 offer 方法,所以我們只看 offer 方法即可。
offer(E e)
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
//判斷是否為空
if (e == null)
throw new NullPointerException();
//顯示鎖
final ReentrantLock lock = this.lock;
lock.lock();
//定義臨時對象
int n, cap;
Object[] array;
//判斷數(shù)組是否滿了
while ((n = size) >= (cap = (array = queue).length))
//數(shù)組擴容
tryGrow(array, cap);
try {
//拿到比較器
Comparator<? super E> cmp = comparator;
//判斷是否有自定義比較器
if (cmp == null)
//堆上浮
siftUpComparable(n, e, array);
else
//使用自定義比較器進行堆上浮
siftUpUsingComparator(n, e, array, cmp);
//隊列長度 +1
size = n + 1;
//喚醒休眠的出隊線程
notEmpty.signal();
} finally {
//釋放鎖
lock.unlock();
}
return true;
}
siftUpComparable(int k, T x, Object[] array)
上浮調整比較器方法的實現(xiàn)
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
//無符號向左移,目的是找到放入位置的父節(jié)點
int parent = (k - 1) >>> 1;
//拿到父節(jié)點的值
Object e = array[parent];
//比較是否大于該元素,不大于就沒比較交換
if (key.compareTo((T) e) >= 0)
break;
//以下都是元素位置交換
array[k] = e;
k = parent;
}
array[k] = key;
}
根據(jù)上面的代碼,可以看出這是完全二叉樹在進行上浮調整。調整入隊的元素,找出最小的,將元素排列有序化。簡單理解就是:父節(jié)點元素值一定要比它的子節(jié)點得小,如果父節(jié)點大于子節(jié)點了,那就兩者位置進行交換。
入隊圖解
例子:85 添加到二叉堆中(大頂堆)
package com.niuh.queue.priority;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
/**
* <p>
* PriorityBlockingQueue 簡單演示 demo
* </p>
*/
public class TestPriorityBlockingQueue {
public static void main(String[] args) throws InterruptedException {
// 大頂堆
PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
concurrentLinkedQueue.offer(90);
concurrentLinkedQueue.offer(80);
concurrentLinkedQueue.offer(70);
concurrentLinkedQueue.offer(60);
concurrentLinkedQueue.offer(40);
concurrentLinkedQueue.offer(30);
concurrentLinkedQueue.offer(20);
concurrentLinkedQueue.offer(10);
concurrentLinkedQueue.offer(50);
concurrentLinkedQueue.offer(85);
//輸出元素排列
concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" "));
//取出元素
Integer take = concurrentLinkedQueue.take();
System.out.println();
concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" "));
}
}
操作的細節(jié)分為兩步:
- 第一步:首先把新元素插入到堆的尾部再說;(新的元素可能是特別大或者特別小,那么要做的一件事情就是重新維護一下堆的所有元素,把新元素挪到這個堆的相應的位置)
- 第二步:依次向上調整整個堆的結構,就叫 HeapifyUp
85 按照上面講的先插入到堆的尾部,也就是一維數(shù)組的尾部,一維數(shù)組的尾部的話就上圖的位置,因為這是一個完全二叉樹,所以它的尾部就是50后面這個結點。插進來之后這個時候就破壞了堆,它的每一個結點都要大于它的兒子的這種屬性了,接下來要做的事情就是要把 85 依次地向上浮動,怎么浮動?就是 85 大于它的父親結點,那么就和父親結點進行交換,直到走到根如果大于根的話,就和根也進行交換。

85 再繼續(xù)往前走之后,它要和 80 再進行比較,同理可得:也就是說這個結點每次和它的父親比,如果它大于它的父親的話就交換,直到它不再大于它的父親。
出隊方法
入隊列的方法說完后,我們來說說出隊列的方法。PriorityBlockingQueue提供了多種出隊操作的實現(xiàn)來滿足不同情況下的需求,如下:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
- E peek()
poll 和 peek 與上面類似,這里不做說明
take()
出隊方法,該方法會阻塞
public E take() throws InterruptedException {
//顯示鎖
final ReentrantLock lock = this.lock;
//可中斷鎖
lock.lockInterruptibly();
//結果接收對象
E result;
try {
//判斷隊列是否為空
while ( (result = dequeue()) == null)
//線程阻塞
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
dequeue()
我們再來看看具體出隊方法的實現(xiàn),dequeue方法
private E dequeue() {
//長度減少 1
int n = size - 1;
//判斷隊列中是否有元素
if (n < 0)
return null;
else {
//隊列對象
Object[] array = queue;
//取出第一個元素
E result = (E) array[0];
//拿出最后一個元素
E x = (E) array[n];
//置空
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
//下沉調整
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
//成功則減少隊列中的元素數(shù)量
size = n;
return result;
}
總體就是找到父節(jié)點與兩個子節(jié)點中最小的一個節(jié)點,然后進行交換位置,不斷重復,由上而下的交換。
siftDownComparable(int k, T x, Object[] array, int n)
再來看看下沉比較器方法的實現(xiàn)
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
//判斷隊列長度
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
//找到隊列最后一個元素的父節(jié)點的索引。
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
//拿到 k 節(jié)點下的左子節(jié)點
int child = (k << 1) + 1; // assume left child is least
//取得子節(jié)點對應的值
Object c = array[child];
//取得 k 右子節(jié)點的索引
int right = child + 1;
//比較右節(jié)點的索引是否小于隊列長度和左右子節(jié)點的值進行比較
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
//比較父節(jié)點值是否大于子節(jié)點
if (key.compareTo((T) c) <= 0)
break;
//下面都是元素替換
array[k] = c;
k = child;
}
array[k] = key;
}
}
出隊圖解
將堆尾元素替換到頂部(即堆頂被替代刪除掉)
依次從根部向下調整整個堆的結構(一直到堆尾即可) HeapifyDown
例子:90 從二叉堆中刪除(大頂堆)

總結
PriorityBlockingQueue 真的是個神奇的隊列,可以實現(xiàn)優(yōu)先出隊。最特別的是它只有一個鎖,入隊操作永遠成功,而出隊只有在空隊列的時候才會進行線程阻塞。可以說有一定的應用場景吧,比如:有任務要執(zhí)行,可以對任務加一個優(yōu)先級的權重,這樣隊列會識別出來,對該任務優(yōu)先進行出隊。