一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 阻塞隊列—ArrayBlockingQueue源碼分析

阻塞隊列—ArrayBlockingQueue源碼分析

2020-11-19 22:51今日頭條一角錢技術(shù) Java教程

ArrayBlockingQueue是一個阻塞隊列,內(nèi)部由ReentrantLock來實現(xiàn)線程安全,由Condition的await和signal來實現(xiàn)等待喚醒的功能。它的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,準(zhǔn)確地說是一個循環(huán)數(shù)組(可以類比一個圓環(huán)),所有的下標(biāo)在到達(dá)最大長度時自動從0繼續(xù)開

阻塞隊列—ArrayBlockingQueue源碼分析

前言

阻塞隊列—ArrayBlockingQueue源碼分析

ArrayBlockingQueue 由數(shù)組支持的有界阻塞隊列,隊列基于數(shù)組實現(xiàn),容量大小在創(chuàng)建 ArrayBlockingQueue 對象時已經(jīng)定義好。 此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序。支持公平鎖和非公平鎖,默認(rèn)采用非公平鎖。其數(shù)據(jù)結(jié)構(gòu)如下圖:

阻塞隊列—ArrayBlockingQueue源碼分析
  • 注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程和請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當(dāng)前等待時間最長的線程先獲取鎖

隊列創(chuàng)建

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5); 

應(yīng)用場景

在線程池中有比較多的應(yīng)用,生產(chǎn)者消費者場景。

  • 先進(jìn)先出隊列(隊列頭的是最先進(jìn)隊的元素;隊列尾的是最后進(jìn)隊的元素)
  • 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現(xiàn)擴(kuò)容,容量滿,則阻塞進(jìn)隊操作;容量空,則阻塞出隊操作)
  • 隊列不支持空元素
  • 公平性 (fairness)可以在構(gòu)造函數(shù)中指定。

此類支持對等待的生產(chǎn)者線程和使用者線程進(jìn)行排序的可選公平策略。默認(rèn)情況下,不保證是這種排序。然而,通過在構(gòu)造函數(shù)將公平性 (fairness) 設(shè)置為 true 而構(gòu)造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

工作原理

ArrayBlockingQueue是對BlockingQueue的一個數(shù)組實現(xiàn),它使用一把全局的鎖并行對queue的讀寫操作,同時使用兩個Condition阻塞容量為空時的取操作和容量滿時的寫操作。

基于 ReentrantLock 保證線程安全,根據(jù) Condition 實現(xiàn)隊列滿時的阻塞。

final ReentrantLock lock; 

 

private final Condition notEmpty; 

 

private final Condition notFull; 

Lock的作用是提供獨占鎖機(jī)制,來保護(hù)競爭資源;而Condition是為了更加精細(xì)地對鎖進(jìn)行控制,它依賴于Lock,通過某個條件對多線程進(jìn)行控制。

notEmpty表示“鎖的非空條件”。當(dāng)某線程想從隊列中取數(shù)據(jù)時,而此時又沒有數(shù)據(jù),則該線程通過notEmpty.await()進(jìn)行等待;當(dāng)其它線程向隊列中插入了元素之后,就調(diào)用notEmpty.signal()喚醒“之前通過notEmpty.await()進(jìn)入等待狀態(tài)的線程”。 同理,notFull表示“鎖的滿條件”。當(dāng)某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當(dāng)其它線程從隊列中取出元素之后,就喚醒該等待的線程。

  • 試圖向已滿隊列中放入元素會導(dǎo)致放入操作受阻塞,直到BlockingQueue里有新的喚空間才會被醒繼續(xù)操作; 試圖從空隊列中檢索元素將導(dǎo)致類似阻塞,直到BlocingkQueue進(jìn)了新貨才會被喚醒。

源碼分析

以下源碼分析基于JDK1.8

定義

ArrayBlockingQueue的類繼承關(guān)系如下:

阻塞隊列—ArrayBlockingQueue源碼分析

其包含的方法定義如下:

阻塞隊列—ArrayBlockingQueue源碼分析

成員屬性

/** 真正存入數(shù)據(jù)的數(shù)組 */ 

   final Object[] items; 

 

   /** take,poll,peek or remove 的下一個索引 */ 

   int takeIndex; 

 

   /** put,offer,or add 下一個索引 */ 

   int putIndex; 

 

   /** 隊列中元素個數(shù) */ 

   int count

 

   /** 可重入鎖 */ 

   final ReentrantLock lock; 

 

   /** 如果數(shù)組是空的,在該Condition上等待 */ 

   private final Condition notEmpty; 

 

   /** 如果數(shù)組是滿的,在該Condition上等待 */ 

   private final Condition notFull; 

 

   /** 遍歷器實現(xiàn) */ 

   transient Itrs itrs = null

構(gòu)造函數(shù)

/** 

    * 構(gòu)造函數(shù),設(shè)置隊列的初始容量 

    */ 

   public ArrayBlockingQueue(int capacity) { 

       this(capacity, false); 

   } 

 

   /** 

    * 構(gòu)造函數(shù), 

    * capacity and the specified access policy. 

    * 

    * @param capacity 設(shè)置數(shù)組大小 

    * @param fair  設(shè)置是否為公平鎖 

    * @throws IllegalArgumentException if {@code capacity < 1} 

    */ 

   public ArrayBlockingQueue(int capacity, boolean fair) { 

       if (capacity <= 0) 

           throw new IllegalArgumentException(); 

       this.items = new Object[capacity]; 

       // 是否為公平鎖,如果是的話,那么先到的線程先獲得鎖對象 

       // 否則,由操作系統(tǒng)調(diào)度由哪個線程獲得鎖,一般為false,性能會比較高 

       lock = new ReentrantLock(fair);  

       notEmpty = lock.newCondition(); 

       notFull =  lock.newCondition(); 

   } 

 

   /** 

    * 構(gòu)造函數(shù),帶有初始內(nèi)容的隊列 

    */ 

   public ArrayBlockingQueue(int capacity, boolean fair, 

                             Collection<? extends E> c) { 

       this(capacity, fair); 

 

       final ReentrantLock lock = this.lock; 

       //加鎖的目的是為了其他CPU能夠立即看到修改 

       //加鎖和解鎖底層都是CAS,會強(qiáng)制修改寫回主存,對其他CPU可見 

       lock.lock(); // 要給數(shù)組設(shè)置內(nèi)容,先上鎖 

       try { 

           int i = 0; 

           try { 

               for (E e : c) { 

                   checkNotNull(e); 

                   items[i++] = e; // 依次拷貝內(nèi)容 

               } 

           } catch (ArrayIndexOutOfBoundsException ex) { 

               throw new IllegalArgumentException(); 

           } 

           count = i; 

           putIndex = (i == capacity) ? 0 : i; // 如果 putIndex大于數(shù)組大小,那么從0重寫開始 

       } finally { 

           lock.unlock(); // 最后一定要釋放鎖 

       } 

   } 

入隊方法

add / offer / put,這三個方法都是往隊列中添加元素,說明如下:

  • add方法依賴于offer方法,如果隊列滿了則拋出異常,否則添加成功返回true;
  • offer方法有兩個重載版本,只有一個參數(shù)的版本,如果隊列滿了就返回false,否則加入到隊列中,返回true,add方法就是調(diào)用此版本的offer方法;另一個帶時間參數(shù)的版本,如果隊列滿了則等待,可指定等待的時間,如果這期間中斷了則拋出異常,如果等待超時了則返回false,否則加入到隊列中返回true;
  • put方法跟帶時間參數(shù)的offer方法邏輯一樣,不過沒有等待的時間限制,會一直等待直到隊列有空余位置了,再插入到隊列中,返回true

/** 

     * 添加一個元素,其實super.add里面調(diào)用了offer方法 

     */ 

    public boolean add(E e) { 

        return super.add(e); 

    } 

 

    /** 

     * 加入成功返回 true,否則返回 false 

     */ 

    public boolean offer(E e) { 

     // 創(chuàng)建插入的元素是否為null,是的話拋出NullPointerException異常 

        checkNotNull(e); 

        // 獲取“該阻塞隊列的獨占鎖” 

        final ReentrantLock lock = this.lock; 

        lock.lock(); // 上鎖 

        try { 

         // 如果隊列已滿,則返回false。 

            if (count == items.length) // 超過數(shù)組的容量 

                return false

            else { 

             // 如果隊列未滿,則插入e,并返回true。 

                enqueue(e);  

                return true

            } 

        } finally { 

         // 釋放鎖 

            lock.unlock(); 

        } 

    } 

 

    /** 

     * 如果隊列已滿的話,就會等待 

     */ 

    public void put(E e) throws InterruptedException { 

        checkNotNull(e); 

        final ReentrantLock lock = this.lock; 

        lock.lockInterruptibly(); //和lock方法的區(qū)別是讓它在阻塞時可以拋出異常跳出 

        try { 

            while (count == items.length) 

                notFull.await(); // 這里就是阻塞了,要注意:如果運行到這里,那么它會釋放上面的鎖,一直等到 notify 

            enqueue(e); 

        } finally { 

            lock.unlock(); 

        } 

    } 

 

    /** 

     * 帶有超時事件的插入方法,unit 表示是按秒、分、時哪一種 

     */ 

    public boolean offer(E e, long timeout, TimeUnit unit) 

        throws InterruptedException { 

 

        checkNotNull(e); 

        long nanos = unit.toNanos(timeout); 

        final ReentrantLock lock = this.lock; 

        lock.lockInterruptibly(); 

        try { 

            while (count == items.length) { 

                if (nanos <= 0) 

                    return false

                nanos = notFull.awaitNanos(nanos); // 帶有超時等待的阻塞方法 

            } 

            enqueue(e); // 入隊 

            return true

        } finally { 

            lock.unlock(); 

        } 

    } 

出隊方法

poll / take / peek,這幾個方法都是獲取隊列頂?shù)脑兀唧w說明如下:

  • poll方法有兩個重載版本,第一個版本,如果隊列是空的,返回null,否則移除并返回隊列頭部元素;另一個帶時間參數(shù)的版本,如果棧為空則等待,可以指定等待的時間,如果等待超時了則返回null,如果被中斷了則拋出異常,否則移除并返回棧頂元素
  • take方法同帶時間參數(shù)的poll方法,但是不能指定等待時間,會一直等待直到隊列中有元素為止,然后移除并返回棧頂元素
  • peek方法只是返回隊列頭部元素,不移除

// 實現(xiàn)的方法,如果當(dāng)前隊列為空,返回null 

   public E poll() { 

       final ReentrantLock lock = this.lock; 

       lock.lock(); 

       try { 

           return (count == 0) ? null : dequeue(); 

       } finally { 

           lock.unlock(); 

       } 

   } 

// 實現(xiàn)的方法,如果當(dāng)前隊列為空,一直阻塞 

   public E take() throws InterruptedException { 

       final ReentrantLock lock = this.lock; 

       lock.lockInterruptibly(); 

       try { 

           while (count == 0) 

               notEmpty.await(); // 隊列為空,阻塞方法 

           return dequeue(); 

       } finally { 

           lock.unlock(); 

       } 

   } 

// 帶有超時事件的取元素方法,否則返回null 

   public E poll(long timeout, TimeUnit unit) throws InterruptedException { 

       long nanos = unit.toNanos(timeout); 

       final ReentrantLock lock = this.lock; 

       lock.lockInterruptibly(); 

       try { 

           while (count == 0) { 

               if (nanos <= 0) 

                   return null

               nanos = notEmpty.awaitNanos(nanos); // 超時等待 

           } 

           return dequeue(); // 取得元素 

       } finally { 

           lock.unlock(); 

       } 

   } 

 

   // 只是看一個隊列最前面的元素,取出是不擅長隊列中原來的元素,隊列為空時返回null 

   public E peek() { 

       final ReentrantLock lock = this.lock; 

       lock.lock(); 

       try { 

           return itemAt(takeIndex); // 隊列為空時返回null 

       } finally { 

           lock.unlock(); 

       } 

   } 

刪除元素方法

remove / clear /drainT,這三個方法用于從隊列中移除元素,具體說明如下:

  • remove方法用于移除某個元素,如果棧為空或者沒有找到該元素則返回false,否則從棧中移除該元素;移除時,如果該元素位于棧頂則直接移除,如果位于棧中間,則需要將該元素后面的其他元素往前面挪動,移除后需要喚醒因為棧滿了而阻塞的線程
  • clear方法用于整個棧,同時將takeIndex置為putIndex,保證棧中的元素先進(jìn)先出;最后會喚醒最多count個線程,因為正常一個線程插入一個元素,如果喚醒超過count個線程,可能導(dǎo)致部分線程因為棧滿了又再次被阻塞
  • drainTo方法有兩個重載版本,一個是不帶個數(shù),將所有的元素都移除并拷貝到指定的集合中;一個帶個數(shù),將指定個數(shù)的元素移除并拷貝到指定的集合中,兩者的底層實現(xiàn)都是同一個方法。移除后需要重置takeIndex和count,并喚醒最多移除個數(shù)的因為棧滿而阻塞的線程。

/** 

    * 從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false 

    */ 

   public boolean remove(Object o) { 

       if (o == nullreturn false

       final Object[] items = this.items; 

       final ReentrantLock lock = this.lock; 

       lock.lock(); 

       try { 

           if (count > 0) { 

               final int putIndex = this.putIndex; 

               int i = takeIndex; 

               //從takeIndex開始往后遍歷直到等于putIndex 

               do { 

                   if (o.equals(items[i])) { 

                       removeAt(i); // 真正刪除的方法 

                       return true

                   } 

                   //走到數(shù)組末尾了又從頭開始,put時也按照這個規(guī)則來 

                   if (++i == items.length) 

                       i = 0; 

               } while (i != putIndex); // 一直不斷的循環(huán)取出來做判斷 

           } 

           //如果數(shù)組為空,返回false 

           return false

       } finally { 

           lock.unlock(); 

       } 

   } 

 

/** 

    * 指定刪除索引上的元素. 

    */ 

   void removeAt(final int removeIndex) { 

       // assert lock.getHoldCount() == 1; 

       // assert items[removeIndex] != null

       // assert removeIndex >= 0 && removeIndex < items.length; 

       final Object[] items = this.items; 

       if (removeIndex == takeIndex) { 

           //如果移除的就是棧頂?shù)脑?nbsp;

           items[takeIndex] = null

           if (++takeIndex == items.length) 

               takeIndex = 0; 

           //元素個數(shù)減1 

           count--; 

           if (itrs != null

               itrs.elementDequeued(); 

       } else { 

           // an "interior" remove 

 

           // 如果移除的是棧中間的某個元素,需要將該元素后面的元素往前挪動 

           final int putIndex = this.putIndex; 

           for (int i = removeIndex;;) { 

               int next = i + 1; 

               //到數(shù)組末尾了,從頭開始 

               if (next == items.length) 

                   next = 0; 

               if (next != putIndex) { 

                //將后面一個元素復(fù)制到前面來 

                   items[i] = items[next]; 

                   i = next

               } else { 

                //如果下一個元素的索引等于putIndex,說明i就是棧中最后一個元素了,直接將該元素置為null 

                   items[i] = null

                   //重置putIndex為i 

                   this.putIndex = i; 

                   break; 

               } 

           } 

           count--; 

           if (itrs != null

            //通知itrs節(jié)點移除了 

               itrs.removedAt(removeIndex); 

       } 

       //喚醒因為棧滿了而等待的線程 

       notFull.signal(); // 有一個元素刪除成功,那肯定隊列不滿 

   } 

 

/** 

    * 清空隊列 

    */ 

   public void clear() { 

       final Object[] items = this.items; 

       final ReentrantLock lock = this.lock; 

       lock.lock(); 

       try { 

           int k = count

           if (k > 0) { 

               final int putIndex = this.putIndex; 

               int i = takeIndex; 

               //從takeIndex開始遍歷直到i等于putIndex,將數(shù)組元素置為null 

               do { 

                   items[i] = null

                   if (++i == items.length) 

                       i = 0; 

               } while (i != putIndex); 

               //注意此處沒有將這兩個index置為0,只是讓他們相等,因為只要相等就可以實現(xiàn)棧先進(jìn)先出了 

               takeIndex = putIndex; 

               count = 0; 

               if (itrs != null

                   itrs.queueIsEmpty(); 

               //如果有因為棧滿了而等待的線程,則將其喚醒 

               //注意這里沒有使用signalAll而是通過for循環(huán)來signal多次,單純從喚醒線程來看是可以使用signalAll的,效果跟這里的for循環(huán)是一樣的 

               //如果有等待的線程,說明count就是當(dāng)前線程的最大容量了,這里清空了,最多只能put count次,一個線程只能put 1次,只喚醒最多count個線程就避免了 

               //線程被喚醒后再次因為棧滿了而阻塞 

               for (; k > 0 && lock.hasWaiters(notFull); k--) 

                   notFull.signal(); 

           } 

       } finally { 

           lock.unlock(); 

       } 

   } 

 

   /** 

    * 取出所有元素到集合 

    */ 

   public int drainTo(Collection<? super E> c) { 

       return drainTo(c, Integer.MAX_VALUE); 

   } 

 

   /** 

    * 取出所有元素到集合 

    */ 

   public int drainTo(Collection<? super E> c, int maxElements) { 

       //校驗參數(shù)合法 

       checkNotNull(c); 

       if (c == this) 

           throw new IllegalArgumentException(); 

       if (maxElements <= 0) 

           return 0; 

       final Object[] items = this.items; 

       final ReentrantLock lock = this.lock; 

       lock.lock(); 

       try { 

        //取兩者之間的最小值 

           int n = Math.min(maxElements, count); 

           int take = takeIndex; 

           int i = 0; 

           try { 

            //從takeIndex開始遍歷,取出元素然后添加到c中,直到滿足個數(shù)要求為止 

               while (i < n) { 

                   @SuppressWarnings("unchecked"

                   E x = (E) items[take]; 

                   c.add(x); 

                   items[take] = null

                   if (++take == items.length) 

                       take = 0; 

                   i++; 

               } 

               return n; 

           } finally { 

               // Restore invariants even if c.add() threw 

               if (i > 0) { 

                //取完了,修改count減去i 

                   count -= i; 

                   takeIndex = take; 

                   if (itrs != null) { 

                       if (count == 0) 

                        //通知itrs 棧空了 

                           itrs.queueIsEmpty(); 

                       else if (i > take) 

                        //說明take中間變成0了,通知itrs 

                           itrs.takeIndexWrapped(); 

                   } 

                   //喚醒在因為棧滿而等待的線程,最多喚醒i個,同上避免線程被喚醒了因為棧又滿了而阻塞 

                   for (; i > 0 && lock.hasWaiters(notFull); i--) 

                       notFull.signal(); 

               } 

           } 

       } finally { 

           lock.unlock(); 

       } 

   } 

iterator / Itr / Itrs

Itr和Itrs都是ArrayBlockingQueue的兩個內(nèi)部類,如下:

阻塞隊列—ArrayBlockingQueue源碼分析

iterator方法返回一個迭代器實例,用于實現(xiàn)for循環(huán)遍歷和部分Collection接口,該方法的實現(xiàn)如下:

public Iterator<E> iterator() { 

 return new Itr(); 

 

Itr() { 

 // assert lock.getHoldCount() == 0; 

 lastRet = NONE; 

 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 

    lock.lock(); 

    try { 

     if (count == 0) { 

         //NONE和DETACHED都是常量 

            cursor = NONE; 

            nextIndex = NONE; 

            prevTakeIndex = DETACHED; 

        } else { 

         //初始化各屬性 

            final int takeIndex = ArrayBlockingQueue.this.takeIndex; 

            prevTakeIndex = takeIndex; 

            nextItem = itemAt(nextIndex = takeIndex); 

            cursor = incCursor(takeIndex); 

            if (itrs == null) { 

             itrs = new Itrs(this); 

            } else { 

             //初始化Itrs,將當(dāng)前線程注冊到Itrs 

                itrs.register(this); // in this order 

                itrs.doSomeSweeping(false); 

            } 

            prevCycles = itrs.cycles; 

            // assert takeIndex >= 0; 

            // assert prevTakeIndex == takeIndex; 

            // assert nextIndex >= 0; 

            // assert nextItem != null

     } 

 } finally { 

    lock.unlock(); 

    } 

 

Itrs(Itr initial) { 

 register(initial); 

 

//根據(jù)index計算cursor 

private int incCursor(int index) { 

 // assert lock.getHoldCount() == 1; 

 if (++index == items.length) 

  index = 0; 

 if (index == putIndex) 

  index = NONE; 

 return index

 

/** 

* 創(chuàng)建一個新的Itr實例時,會調(diào)用此方法將該實例添加到Node鏈表中 

*/ 

void register(Itr itr) { 

 //創(chuàng)建一個新節(jié)點將其插入到head節(jié)點的前面 

 head = new Node(itr, head); 

小結(jié)

ArrayBlockingQueue是一個阻塞隊列,內(nèi)部由ReentrantLock來實現(xiàn)線程安全,由Condition的await和signal來實現(xiàn)等待喚醒的功能。它的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,準(zhǔn)確地說是一個循環(huán)數(shù)組(可以類比一個圓環(huán)),所有的下標(biāo)在到達(dá)最大長度時自動從0繼續(xù)開始。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲精品高清中文字幕完整版 | 国产91视频网 | 国产亚洲成归v人片在线观看 | 国产精品久久毛片蜜月 | 美女国内精品自产拍在线播放 | 国产精品亚洲片在线观看麻豆 | 成人精品视频一区二区在线 | 欧美特一级 | 接吻吃胸摸下面啪啪教程 | 亚洲精品高清中文字幕完整版 | 无限时间看片在线观看 | 免费aⅴ片| 日本javhd| 嫩模被黑人粗大挺进 | 欧美一级特黄刺激大片视频 | 叉逼视频 | 国产精品怡红院在线观看 | 性做久久久久久久久老女人 | 亚洲国产精品婷婷久久久久 | 国产日韩精品欧美一区 | 亚洲同性男男gay1069 | 成年人视频在线播放 | 国产日韩欧美不卡www | 亚洲AV福利天堂一区二区三 | 国产亚洲人成网站在线观看不卡 | 小柔的性放荡羞辱日记 | 天天狠天天天天透在线 | 国产精品合集一区二区 | 午夜精品久久久久久久99蜜桃 | 午夜影院0606免费 | 无人影院在线播放视频 | kayden kross喷水 | 国产梦呦精品 | 好大用力深一点女公交车 | 天堂在线中文无弹窗全文阅读 | 羞羞答答影院在线 | 国产探花在线视频 | 国产亚洲精品久久yy5099 | 日本漫画无翼乌 | 日本人成在线视频免费播放 | 欧美特黄三级在线观看 |