一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 阻塞隊列—PriorityBlockingQueue源碼分析

阻塞隊列—PriorityBlockingQueue源碼分析

2020-11-25 00:01今日頭條一角錢技術 Java教程

PriorityBlockingQueue 優(yōu)先級隊列,線程安全(添加、讀取都進行了加鎖)、無界、讀阻塞的隊列,底層采用的堆結構實現(xiàn)(二叉樹),默認是小根堆,最小的或者最大的元素會一直置頂,每次獲取都取最頂端的數(shù)據(jù)

阻塞隊列—PriorityBlockingQueue源碼分析

 前言

阻塞隊列—PriorityBlockingQueue源碼分析

PriorityBlockingQueue 優(yōu)先級隊列,線程安全(添加、讀取都進行了加鎖)、無界、讀阻塞的隊列,底層采用的堆結構實現(xiàn)(二叉樹),默認是小根堆,最小的或者最大的元素會一直置頂,每次獲取都取最頂端的數(shù)據(jù)

隊列創(chuàng)建

小根堆

PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); 

大根堆

PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 

 @Override 

 public int compare(Integer o1, Integer o2) { 

  return o2 - o1; 

 } 

}); 

應用場景

有任務要執(zhí)行,可以對任務加一個優(yōu)先級的權重,這樣隊列會識別出來,對該任務優(yōu)先進行出隊。

我們來看一個具體例子,例子中定義了一個將要放入“優(yōu)先阻塞隊列”的任務類,并且定義了一個任務工場類和一個任務執(zhí)行類,在任務工場類中產(chǎn)生了各種不同優(yōu)先級的任務,將其添加到隊列中,在任務執(zhí)行類中,任務被一個個取出并執(zhí)行。

package com.niuh.queue.priority; 

 

import java.util.ArrayList; 

import java.util.List; 

import java.util.Queue; 

import java.util.Random; 

import java.util.concurrent.ExecutorService; 

import java.util.concurrent.Executors; 

import java.util.concurrent.PriorityBlockingQueue; 

import java.util.concurrent.TimeUnit; 

 

/** 

 * <p> 

 * PriorityBlockingQueue使用示例 

 * </p> 

 */ 

public class PriorityBlockingQueueDemo { 

 

    public static void main(String[] args) throws Exception { 

        Random random = new Random(47); 

        ExecutorService exec = Executors.newCachedThreadPool(); 

        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); 

        exec.execute(new PrioritizedTaskProducer(queue, exec)); // 這里需要注意,往PriorityBlockingQueue中添加任務和取出任務的 

        exec.execute(new PrioritizedTaskConsumer(queue)); // 步驟是同時進行的,因而輸出結果并不一定是有序的 

    } 

 

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { 

    private Random random = new Random(47); 

    private static int counter = 0; 

    private final int id = counter++; 

    private final int priority; 

 

    protected static List<PrioritizedTask> sequence = new ArrayList<>(); 

 

    public PrioritizedTask(int priority) { 

        this.priority = priority; 

        sequence.add(this); 

    } 

 

    @Override 

    public int compareTo(PrioritizedTask o) { 

        return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);  // 定義優(yōu)先級計算方式 

    } 

 

    @Override 

    public void run() { 

        try { 

            TimeUnit.MILLISECONDS.sleep(random.nextInt(250)); 

        } catch (InterruptedException e) { 

        } 

        System.out.println(this); 

    } 

 

    @Override 

    public String toString() { 

        return String.format("[%1$-3d]", priority) + " Task " + id; 

    } 

 

    public String summary() { 

        return "(" + id + ": " + priority + ")"

    } 

 

    public static class EndSentinel extends PrioritizedTask { 

        private ExecutorService exec

 

        public EndSentinel(ExecutorService exec) { 

            super(-1); 

            this.exec = exec

        } 

 

        @Override 

        public void run() { 

            int count = 0; 

            for (PrioritizedTask pt : sequence) { 

                System.out.print(pt.summary()); 

                if (++count % 5 == 0) { 

                    System.out.println(); 

                } 

            } 

            System.out.println(); 

            System.out.println(this + " Calling shutdownNow()"); 

            exec.shutdownNow(); 

        } 

    } 

 

class PrioritizedTaskProducer implements Runnable { 

    private Random random = new Random(47); 

    private Queue<Runnable> queue; 

    private ExecutorService exec

 

    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) { 

        this.queue = queue; 

        this.exec = exec

    } 

 

    @Override 

    public void run() { 

        for (int i = 0; i < 20; i++) { 

            queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加隨機優(yōu)先級的任務 

            Thread.yield(); 

        } 

        try { 

            for (int i = 0; i < 10; i++) { 

                TimeUnit.MILLISECONDS.sleep(250); 

                queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加優(yōu)先級為10的任務 

            } 

            for (int i = 0; i < 10; i++) { 

                queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加優(yōu)先級為1-10的任務 

            } 

            queue.add(new PrioritizedTask.EndSentinel(exec)); 

        } catch (InterruptedException e) { 

        } 

        System.out.println("Finished PrioritizedTaskProducer"); 

    } 

 

class PrioritizedTaskConsumer implements Runnable { 

    private PriorityBlockingQueue<Runnable> queue; 

 

    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) { 

        this.queue = queue; 

    } 

 

    @Override 

    public void run() { 

        try { 

            while (!Thread.interrupted()) { 

                queue.take().run(); // 任務的消費者,從PriorityBlockingQueue中取出任務執(zhí)行 

            } 

        } catch (InterruptedException e) { 

        } 

        System.out.println("Finished PrioritizedTaskConsumer"); 

    } 

工作原理

PriorityBlockingQueue 是 JDK1.5 的時候出來的一個阻塞隊列。但是該隊列入隊的時候是不會阻塞的,永遠會加到隊尾。下面我們介紹下它的幾個特點:

  • PriorityBlockingQueue 和 ArrayBlockingQueue 一樣是基于數(shù)組實現(xiàn)的,但后者在初始化時需要指定長度,前者默認長度是 11。
  • 該隊列可以說是真正的無界隊列,它在隊列滿的時候會進行擴容,而前面說的無界阻塞隊列其實都有有界,只是界限太大可以忽略(最大值是 2147483647)
  • 該隊列屬于權重隊列,可以理解為它可以進行排序,但是排序不是從小到大排或從大到小排,是基于數(shù)組的堆結構(具體如何排下面會進行分析)
  • 出隊方式和前面的也不同,是根據(jù)權重來進行出隊,和前面所說隊列中那種先進先出或者先進后出方式不同。
  • 其存入的元素必須實現(xiàn)Comparator,或者在創(chuàng)建隊列的時候自定義Comparator。

注意:

  1. 堆結構實際上是一種完全二叉樹。關于二叉樹可以查看 《樹、二叉樹、二叉搜索樹的實現(xiàn)和特性》
  2. 堆又分為大頂堆和小頂堆 。大頂堆中第一個元素肯定是所有元素中最大的,小頂堆中第一個元素是所有元素中最小的。關于二叉堆可以查看《堆和二叉堆的實現(xiàn)和特性》

源碼分析

定義

PriorityBlockingQueue的類繼承關系如下:

阻塞隊列—PriorityBlockingQueue源碼分析

其包含的方法定義如下:

阻塞隊列—PriorityBlockingQueue源碼分析

成員屬性

從下面的字段我們可以知道,該隊列可以排序,使用顯示鎖來保證操作的原子性,在空隊列時,出隊線程會堵塞等。

/** 

* 默認數(shù)組長度 

*/ 

private static final int DEFAULT_INITIAL_CAPACITY = 11; 

 

/** 

 * 最大達容量,分配時超出可能會出現(xiàn) OutOfMemoryError 異常 

 */ 

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 

 

/** 

 * 隊列,存儲我們的元素 

 */ 

private transient Object[] queue; 

 

/** 

 * 隊列長度 

 */ 

private transient int size

 

/** 

 * 比較器,入隊進行權重的比較 

 */ 

private transient Comparator<? super E> comparator; 

 

/** 

 * 顯示鎖 

 */ 

private final ReentrantLock lock; 

 

/** 

 * 空隊列時進行線程阻塞的 Condition 對象 

 */ 

private final Condition notEmpty; 

構造函數(shù)

/** 

* 默認構造,使用長度為 11 的數(shù)組,比較器為空 

*/ 

public PriorityBlockingQueue() { 

    this(DEFAULT_INITIAL_CAPACITY, null); 

/** 

* 自定義數(shù)據(jù)長度構造,比較器為空 

*/ 

public PriorityBlockingQueue(int initialCapacity) { 

    this(initialCapacity, null); 

/** 

* 自定義數(shù)組長度,可以自定義比較器 

*/ 

public PriorityBlockingQueue(int initialCapacity, 

                             Comparator<? super E> comparator) { 

    if (initialCapacity < 1) 

        throw new IllegalArgumentException(); 

    this.lock = new ReentrantLock(); 

    this.notEmpty = lock.newCondition(); 

    this.comparator = comparator; 

    this.queue = new Object[initialCapacity]; 

/** 

* 構造函數(shù),帶有初始內容的隊列 

*/ 

public PriorityBlockingQueue(Collection<? extends E> c) { 

    this.lock = new ReentrantLock(); 

    this.notEmpty = lock.newCondition(); 

    boolean heapify = true; // true if not known to be in heap order 

    boolean screen = true;  // true if must screen for nulls 

    if (c instanceof SortedSet<?>) { 

        SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 

        this.comparator = (Comparator<? super E>) ss.comparator(); 

        heapify = false

    } 

    else if (c instanceof PriorityBlockingQueue<?>) { 

        PriorityBlockingQueue<? extends E> pq = 

            (PriorityBlockingQueue<? extends E>) c; 

        this.comparator = (Comparator<? super E>) pq.comparator(); 

        screen = false

        if (pq.getClass() == PriorityBlockingQueue.class) // exact match 

            heapify = false

    } 

    Object[] a = c.toArray(); 

    int n = a.length; 

    // If c.toArray incorrectly doesn't return Object[], copy it. 

    if (a.getClass() != Object[].class) 

        a = Arrays.copyOf(a, n, Object[].class); 

    if (screen && (n == 1 || this.comparator != null)) { 

        for (int i = 0; i < n; ++i) 

            if (a[i] == null

                throw new NullPointerException(); 

    } 

    this.queue = a; 

    this.size = n; 

    if (heapify) 

        heapify(); 

入隊方法

入隊方法,下面可以看到 put 方法最終會調用 offer 方法,所以我們只看 offer 方法即可。

offer(E e)

public void put(E e) { 

    offer(e); // never need to block 

 

public boolean offer(E e) { 

    //判斷是否為空 

    if (e == null

        throw new NullPointerException(); 

    //顯示鎖 

    final ReentrantLock lock = this.lock; 

    lock.lock(); 

    //定義臨時對象 

    int n, cap; 

    Object[] array; 

    //判斷數(shù)組是否滿了 

    while ((n = size) >= (cap = (array = queue).length)) 

        //數(shù)組擴容 

        tryGrow(array, cap); 

    try { 

        //拿到比較器 

        Comparator<? super E> cmp = comparator; 

        //判斷是否有自定義比較器 

        if (cmp == null

            //堆上浮 

            siftUpComparable(n, e, array); 

        else 

            //使用自定義比較器進行堆上浮 

            siftUpUsingComparator(n, e, array, cmp); 

        //隊列長度 +1 

        size = n + 1; 

        //喚醒休眠的出隊線程 

        notEmpty.signal(); 

    } finally { 

        //釋放鎖 

        lock.unlock(); 

    } 

    return true

siftUpComparable(int k, T x, Object[] array)

上浮調整比較器方法的實現(xiàn)

private static <T> void siftUpComparable(int k, T x, Object[] array) { 

        Comparable<? super T> key = (Comparable<? super T>) x; 

        while (k > 0) { 

         //無符號向左移,目的是找到放入位置的父節(jié)點 

            int parent = (k - 1) >>> 1; 

            //拿到父節(jié)點的值 

            Object e = array[parent]; 

            //比較是否大于該元素,不大于就沒比較交換 

            if (key.compareTo((T) e) >= 0) 

                break; 

            //以下都是元素位置交換 

            array[k] = e; 

            k = parent; 

        } 

        array[k] = key

    } 

根據(jù)上面的代碼,可以看出這是完全二叉樹在進行上浮調整。調整入隊的元素,找出最小的,將元素排列有序化。簡單理解就是:父節(jié)點元素值一定要比它的子節(jié)點得小,如果父節(jié)點大于子節(jié)點了,那就兩者位置進行交換。

入隊圖解

例子:85 添加到二叉堆中(大頂堆)

package com.niuh.queue.priority; 

 

import java.util.Comparator; 

import java.util.concurrent.PriorityBlockingQueue; 

 

/** 

 * <p> 

 * PriorityBlockingQueue 簡單演示 demo 

 * </p> 

 */ 

public class TestPriorityBlockingQueue { 

 

    public static void main(String[] args) throws InterruptedException { 

        // 大頂堆 

        PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 

            @Override 

            public int compare(Integer o1, Integer o2) { 

                return o2 - o1; 

            } 

        }); 

 

        concurrentLinkedQueue.offer(90); 

        concurrentLinkedQueue.offer(80); 

        concurrentLinkedQueue.offer(70); 

        concurrentLinkedQueue.offer(60); 

        concurrentLinkedQueue.offer(40); 

        concurrentLinkedQueue.offer(30); 

        concurrentLinkedQueue.offer(20); 

        concurrentLinkedQueue.offer(10); 

        concurrentLinkedQueue.offer(50); 

        concurrentLinkedQueue.offer(85); 

        //輸出元素排列 

        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 

        //取出元素 

        Integer take = concurrentLinkedQueue.take(); 

        System.out.println(); 

        concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 

    } 

 阻塞隊列—PriorityBlockingQueue源碼分析

操作的細節(jié)分為兩步:

  • 第一步:首先把新元素插入到堆的尾部再說;(新的元素可能是特別大或者特別小,那么要做的一件事情就是重新維護一下堆的所有元素,把新元素挪到這個堆的相應的位置)
  • 第二步:依次向上調整整個堆的結構,就叫 HeapifyUp

  阻塞隊列—PriorityBlockingQueue源碼分析

85 按照上面講的先插入到堆的尾部,也就是一維數(shù)組的尾部,一維數(shù)組的尾部的話就上圖的位置,因為這是一個完全二叉樹,所以它的尾部就是50后面這個結點。插進來之后這個時候就破壞了堆,它的每一個結點都要大于它的兒子的這種屬性了,接下來要做的事情就是要把 85 依次地向上浮動,怎么浮動?就是 85 大于它的父親結點,那么就和父親結點進行交換,直到走到根如果大于根的話,就和根也進行交換。

阻塞隊列—PriorityBlockingQueue源碼分析

85 再繼續(xù)往前走之后,它要和 80 再進行比較,同理可得:也就是說這個結點每次和它的父親比,如果它大于它的父親的話就交換,直到它不再大于它的父親。

 阻塞隊列—PriorityBlockingQueue源碼分析

出隊方法

入隊列的方法說完后,我們來說說出隊列的方法。PriorityBlockingQueue提供了多種出隊操作的實現(xiàn)來滿足不同情況下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);
  • E peek()

poll 和 peek 與上面類似,這里不做說明

take()

出隊方法,該方法會阻塞

public E take() throws InterruptedException { 

 //顯示鎖 

    final ReentrantLock lock = this.lock; 

    //可中斷鎖 

    lock.lockInterruptibly(); 

    //結果接收對象 

    E result; 

    try { 

     //判斷隊列是否為空 

        while ( (result = dequeue()) == null

         //線程阻塞 

            notEmpty.await(); 

    } finally { 

        lock.unlock(); 

    } 

    return result; 

dequeue()

我們再來看看具體出隊方法的實現(xiàn),dequeue方法

private E dequeue() { 

//長度減少 1 

   int n = size - 1; 

   //判斷隊列中是否有元素 

   if (n < 0) 

       return null

   else { 

    //隊列對象 

       Object[] array = queue; 

       //取出第一個元素 

       E result = (E) array[0]; 

       //拿出最后一個元素 

       E x = (E) array[n]; 

       //置空 

       array[n] = null

       Comparator<? super E> cmp = comparator; 

       if (cmp == null

        //下沉調整 

           siftDownComparable(0, x, array, n); 

       else 

           siftDownUsingComparator(0, x, array, n, cmp); 

       //成功則減少隊列中的元素數(shù)量 

       size = n; 

       return result; 

   } 

總體就是找到父節(jié)點與兩個子節(jié)點中最小的一個節(jié)點,然后進行交換位置,不斷重復,由上而下的交換。

siftDownComparable(int k, T x, Object[] array, int n)

再來看看下沉比較器方法的實現(xiàn)

private static <T> void siftDownComparable(int k, T x, Object[] array, 

                                               int n) { 

    //判斷隊列長度 

    if (n > 0) { 

        Comparable<? super T> key = (Comparable<? super T>)x; 

        //找到隊列最后一個元素的父節(jié)點的索引。 

        int half = n >>> 1;           // loop while a non-leaf 

        while (k < half) { 

         //拿到 k 節(jié)點下的左子節(jié)點 

            int child = (k << 1) + 1; // assume left child is least 

            //取得子節(jié)點對應的值 

            Object c = array[child]; 

            //取得 k 右子節(jié)點的索引 

            int right = child + 1; 

            //比較右節(jié)點的索引是否小于隊列長度和左右子節(jié)點的值進行比較 

            if (right < n && 

                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 

                c = array[child = right]; 

            //比較父節(jié)點值是否大于子節(jié)點 

            if (key.compareTo((T) c) <= 0) 

                break; 

            //下面都是元素替換 

            array[k] = c; 

            k = child; 

        } 

        array[k] = key

    } 

出隊圖解

將堆尾元素替換到頂部(即堆頂被替代刪除掉)

依次從根部向下調整整個堆的結構(一直到堆尾即可) HeapifyDown

例子:90 從二叉堆中刪除(大頂堆)

阻塞隊列—PriorityBlockingQueue源碼分析

總結

PriorityBlockingQueue 真的是個神奇的隊列,可以實現(xiàn)優(yōu)先出隊。最特別的是它只有一個鎖,入隊操作永遠成功,而出隊只有在空隊列的時候才會進行線程阻塞。可以說有一定的應用場景吧,比如:有任務要執(zhí)行,可以對任務加一個優(yōu)先級的權重,這樣隊列會識別出來,對該任務優(yōu)先進行出隊。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 免费一级特黄特色大片在线 | 欧美成人一区二区 | 激情综合站 | 亚洲AV久久久噜噜噜久久 | 欧美一区二区三区四区五区六区 | 91亚洲精品久久91综合 | 男人桶女下面60分钟视频 | 成人综合婷婷国产精品久久免费 | 视频一区二区国产 | 日韩精品特黄毛片免费看 | 接吻吃胸摸下面啪啪教程 | 黑人巨荃大战乌克兰美女 | 日韩一品在线播放视频一品免费 | 国产一区二区三区高清视频 | 亚洲日韩精品欧美一区二区一 | 日本久久免费大片 | 久久r视频 | 男人j进女屁股视频在线观看 | 二次元美女扒开内裤露尿口 | 免费大片 | 5x社区发源地最新地址 | 日韩香蕉网 | 色悠久久久久综合欧美99 | 精品一卡2卡3卡4卡5卡亚洲 | s0e一923春菜花在线播放 | 99精品在免费线视频 | 日本视频一区在线观看免费 | 91真人毛片一级在线播放 | 免费成年网站 | 久久毛片免费看一区二区三区 | 女人c交zzzooo在线观看 | yellow视频在线观看免费 | 亚洲午夜精品久久久久久成年 | 糖心视频在线观看 | 久久这里只有精品国产精品99 | 久久se视频精品视频在线 | 精品在线91 | 国产福利专区精品视频 | japanesepooping脱粪 | 古装一级无遮挡毛片免费观看 | 夫妻性生活一级黄色片 |