目錄
- 一、PriorityBlockingQueue概述
- 二、PriorityBlockingQueue源碼解析
- 1.容器
- 2.比較器
- 3.構造函數
- 4.添加元素
- 5.獲取元素
- 6.維護堆性質
- 總結
PriorityBlockingQueue是Java中實現了堆數據結構的線程安全的有界阻塞隊列。它可以在多線程場景下安全地進行元素添加、刪除和獲取操作,而且可以根據元素的優先級進行排序。本篇博客將會深入解讀PriorityBlockingQueue的源碼實現。
一、PriorityBlockingQueue概述
PriorityBlockingQueue類實現了BlockingQueue接口,它是一個線程安全的隊列,繼承自AbstractQueue類,而AbstractQueue類又實現了Queue接口。PriorityBlockingQueue是一個有界的隊列,其容量可以在構造函數中進行指定,若不指定則默認大小為Integer.MAX_VALUE。同時,PriorityBlockingQueue也支持根據元素的優先級進行排序,這是由于PriorityBlockingQueue內部實現了一個堆數據結構。
二、PriorityBlockingQueue源碼解析
1.容器
PriorityBlockingQueue內部使用了一個Object類型的數組queue來存儲元素,同時使用了一個int類型的變量size來記錄元素的數量。下面是PriorityBlockingQueue類中的定義:
private transient Object[] queue; private transient int size;
2.比較器
PriorityBlockingQueue可以根據元素的優先級進行排序,這是由于PriorityBlockingQueue內部維護了一個小根堆或大根堆。在構造函數中,我們可以選擇使用元素自身的比較方式或是自定義比較器來進行元素的排序。若未指定比較器,則PriorityBlockingQueue將使用元素自身的比較方式進行排序。
private final Comparator<? super E> comparator;
3.構造函數
PriorityBlockingQueue提供了多個構造函數,我們可以選擇使用無參構造函數來創建一個默認容量為Integer.MAX_VALUE的PriorityBlockingQueue,或是使用帶有初始容量或自定義比較器的構造函數。下面是PriorityBlockingQueue類的兩個構造函數:
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; this.comparator = comparator; }
4.添加元素
PriorityBlockingQueue中添加元素的方法為offer()方法,它會首先檢查容量是否足夠,如果容量不足則會進行擴容操作,擴容的方式是將原數組長度增加一半。接著,它會將新元素添加到隊列的末尾,并通過siftUp()方法將元素上濾到合適的位置,以維護堆的性質。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (n == 0) { array[0] = e; } else { siftUp(n, e, array, cmp); } size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
5.獲取元素
PriorityBlockingQueue中獲取元素的方法為take()方法,它會首先檢查隊列是否為空,如果隊列為空則會將當前線程阻塞,直到有元素被添加到隊列中。接著,它會獲取隊列的頭部元素,并通過siftDown()方法將隊列的末尾元素移動到頭部,以維護堆的性質。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while (size == 0) notEmpty.await(); result = extract(); } finally { lock.unlock(); } return result; } private E extract() { final Object[] array = queue; final E result = (E) array[0]; final int n = --size; final E x = (E) array[n]; array[n] = null; if (n != 0) siftDown(0, x, array, comparator); return result; }
6.維護堆性質
PriorityBlockingQueue使用小根堆或大根堆來維護元素的優先級,這里我們以小根堆為例。小根堆的特點是父節點的值小于等于左右子節點的值,PriorityBlockingQueue中的堆是通過數組來實現的。當添加元素時,會將新元素添加到隊列的末尾,并通過siftUp()方法將元素上濾到合適的位置,以維護堆的性質。當獲取元素時,會獲取隊列的頭部元素,并通過siftDown()方法將隊列的末尾元素移動到頭部,以維護堆的性質。下面是siftUp()和siftDown()方法的代碼實現:
private static <T> void siftUp(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftUpUsingComparator(k, x, array, cmp); else siftUpComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } private static <T> void siftDown(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftDownUsingComparator(k, x, array, cmp); else siftDownComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftDownUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftDownComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }
siftUp()方法和siftDown()方法都使用了siftUpUsingComparator()方法和siftDownUsingComparator()方法,它們是使用Comparator來實現堆的上濾和下濾的。當PriorityBlockingQueue沒有指定Comparator時,會使用元素自身的Comparable來實現堆的上濾和下濾。
總結
PriorityBlockingQueue是Java并發包中的一個線程安全的、支持優先級隊列的類,它使用小根堆或大根堆來維護元素的優先級。PriorityBlockingQueue的內部實現使用了ReentrantLock和Condition來實現線程安全的操作,同時使用了數組來實現堆。PriorityBlockingQueue的核心方法包括添加元素的offer()方法,獲取元素的take()方法,以及維護堆性質的siftUp()方法和siftDown()方法。PriorityBlockingQueue的使用方式與普通隊列類似,但可以根據元素的優先級進行排序和處理。
以上就是詳解Java并發編程中的優先級隊列PriorityBlockingQueue的詳細內容,更多關于Java PriorityBlockingQueue的資料請關注其它相關文章!
原文地址:https://juejin.cn/post/7229492628873674807