一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java源碼重讀之ConcurrentHashMap詳解

Java源碼重讀之ConcurrentHashMap詳解

2023-05-09 01:04未知服務器之家 Java教程

目錄 0. 第一個屬性 serialPersistentFields 1. spread() 2. tabAt()、casTabAt()、setTabAt() 3. counterCells 4. keySet、values、entrySet 5. 構造方法 6. putAll() 7. tryPresize() 8. resizeStamp() 9.transfer() 10.putVal() 11.initTable() 12.helpTransfer() 13.addCount() 14.get()

目錄
  • 0. 第一個屬性 serialPersistentFields
  • 1. spread()
  • 2. tabAt()、casTabAt()、setTabAt()
  • 3. counterCells
  • 4. keySet、values、entrySet
  • 5. 構造方法
  • 6. putAll()
  • 7. tryPresize()
  • 8. resizeStamp()
  • 9.transfer()
  • 10.putVal()
  • 11.initTable()
  • 12.helpTransfer()
  • 13.addCount()
  • 14.get()

如果你沒有閱讀過 Java 源碼重讀系列之 HashMap 這篇文章的話,建議你先讀一下。以下所有內容的前提是你已閱讀過以上的文章。

另外,凡是涉及到多線程、并發的東西從來就沒有簡單的,所以這次我們很難講清楚 ConcurrentHashMap 中的所有內容,只能聚焦到以下幾個內容

  • ConcurrentHashMap 的 get 操作
  • ConcurrentHashMap 的 put 操作
  • ConcurrentHashMap 的 resize() 操作
  • ConcurrentHashMap 是如何保證線程安全的

如果你想要了解的內容不在以上范圍內,那就不用繼續閱讀了,以免浪費時間~

0. 第一個屬性 serialPersistentFields

因為 ConcurrentHashMap 的邏輯比較復雜,所以我們直接從 serialPersistentFields 屬性說起,它之前的這些屬性等用到的時候我們再看就好了,你只要知道 這個屬性之前還有一堆固定的屬性就好了。

serialPersistentFields 屬性是一個 ObjectStreamField 的數組,而且默認添加了三個元素。

    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

我們點到 ObjectStreamField 類中去,它的類頭有一段這樣的描述:

?* A description of a Serializable field from a Serializable class. ?An array
?* of ObjectStreamFields is used to declare the Serializable fields of a class.

簡單翻譯一下就是,一個序列化類中可以序列化屬性的描述。ObjectStreamFields 數組聲明了這個類的可序列化的字段。

好了,這個類我們看到這就可以了,而且也知道了 ConcurrentHashMap 中 serialPersistentFields 屬性的作用。就是聲明了一下 ConcurrentHashMap 里有三個 屬性可以被序列化。這三個屬性分別是segments、segmentMask、segmentShift 。結束~

1. spread()

繼續往下是 Node 類的定義,沒什么好說的,我們遇到了第一個方法。

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

都是一些位運算。解釋一下,這個方法會將 h 和 h 右移 16 位的數值進行異或(^)操作,得到的結果與 HASH_BITS 進行與(&)操作。和 HASH_BITS 進行與(&)操作,作用就是保證返回的結果的最高位一定是 0,也就是說,返回的結果一定是正數。(如果你對位運算沒有什么概念的話,也可以不用糾結這個方法,這個方法的作用就是,給一個數,返回另外一個數。)

2. tabAt()、casTabAt()、setTabAt()

    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

這幾個是其實是 ConcurrentHashMap 的核心操作方法。tabAt() 是獲取,casTabAt() 是更新,并且是基于 CAS 方式實現的更新。setTabAt() 是插入。這些實現都使用了大名鼎鼎的 sun.misc.Unsafe 類。

如果你對這個類不熟悉的話,其實可以簡單理解,這個類里的一些方法都是線程的。因為這個類提供的是一些硬件級別的原子操作。簡單來說,sun.misc.Unsafe 類提供的方法都是線程安全的。理解到這里就可以了,再深入的內容,就不再本文范圍內了。繼續往下。

3. counterCells

繼續往下的話,就看到了 tablenextTable,沒什么說的,這個就是存儲數據的數組了,至于 nextTable,通過注釋可以看到,這個變量應該是只在擴容時使用到了,等用到的時候再說。

繼續往下呢就是一些int 類型的值了,通過名字和注釋也看不出來什么,直接跳過。等用到的時候再說。繼續往下的話我們就看到了一個 CounterCell[] 數組了,點到這個類的定義,可以看到以下代碼。

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

好像也沒有多復雜,就一個使用了 volatile 標記的 數值。至于 sun.misc.Contended 注解,主要是解決 CPU 偽緩存 問題的,提高性能和效率使用的,可以先不用關注。

但是,如果你閱讀一下注釋的話,就會發現這里面大有文章。涉及到兩個非常復雜的東西:LongAdder and Striped64。關于 LongAdder and Striped64 的內容也不在本文范圍內,有興趣的話可以搜一下相關的文章。不了解也沒有關系,不影響閱讀。我們繼續往下看。

4. keySet、values、entrySet

再往下就是幾個 view 變量了。

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

看名字也應該能猜出來,這些變量應該是跟 HashMap 的 keySet()、values()、entrySet() 幾個方法的作用類似。如果你點到它的定義就會看到,這幾個類都繼承了 CollectionView 這個類。

abstract static class CollectionView<K,V,E>
        implements Collection<E>, java.io.Serializable {
        private static final long serialVersionUID = 7249069246763182397L;
        final ConcurrentHashMap<K,V> map;
        CollectionView(ConcurrentHashMap<K,V> map)  { this.map = map; }
        //... ...

只看前面幾行就可以了,內部有一個 ConcurrentHashMap 類型的變量,而且 CollectionView 只有一個帶有 ConcurrentHashMap 參數的構造方法。盲猜也能猜到,上面的 xxxView 類內部操作的也都是 ConcurrentHashMap 存儲的數據。了解這些就可以了,我們繼續往下看。

5. 構造方法

第一個是個空構造方法沒有什么好說的,先看第二個。

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

通過注釋和名稱我們應該能夠知道,這個構造方法可以初始化 Map 的容量。有意思的是,計算 cap 的方法。不知道你還記不記得 HashMap 的初始容量的構造方法是怎么計算容量的。代碼在下面

this.threshold = tableSizeFor(initialCapacity);

而 ConcurrentHashMap 則是將 initialCapacity 加上了 initialCapacity 的一半又加了 1 作為 tableSizeFor 的參數。其實就是為了解決 HashMap 存在的可能出現兩次擴容的問題。

注意,這里使用的是 >>>,不是 >>>>> 的含義是無符號右移。它會把最高位表示正負的值也會右移,然后補 0。 所以 >>> 之后,一定是正數。如果 >>> 之前是正數的話,結果跟 >> 一致。如果是負數的話,就會出現一個很奇怪的正數。這是因為最高位表示負數的 1 也跟著右移了。由于代碼里已經判斷了小于 0 ,所以我們目前先按照除 2 理解即可。

還有一個點是,從代碼來看,ConcurrentHashMap 的最大容量 好像 是用 sizeCtl 表示的。但是,如果僅僅是表示最大容量,為什么會定義一個這么奇怪的名字呢? Ctl 的后綴應該是 control 的簡寫。具體是怎么控制的呢?

繼續往下,我們先跳過帶有 Map 參數的構造方法,因為這個涉及到 putAll() 方法。

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

這兩個構造方法其實可以算做一個,我們直接看下面那個復雜的。

先判斷了一下參數的取值,然后更新了一下 initialCapacity 參數,然后根據參數計算 size,考慮到 loadFactor 可能小于 1,導致 int 值越界,所以轉成了 long 類型。

關于 concurrencyLevel,給的注釋是:并發更新線程的預估數量。那上面那段判斷更新就不難理解了。假如我預估會有 20 個線程同時更新這個初始容量為 15 的 Map,這個時候的初始容量會自動的改為 20 。

好像沒有什么問題?有意思的是, loadFactor 這個參數竟然沒有保存!! 加載因子沒有保存,那什么時候觸發擴容呢?我們繼續往下看。

6. putAll()

回到帶有 Map 參數的構造方法。

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

沒有什么復雜的,指定了下默認的初始容量(16)就直接 putAll(m); 了。

    public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

好像也不難,先執行 tryPresize(m.size()); 應該是初始擴容, 然后再 for 循環進行 putVal() 操作。

7. tryPresize()

先看下方法名。嘗試并行重置容量。里面的 P 應該是 parallel(并行) 的縮寫。

    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc ==   + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

首先計算了下變量 c,這個是保存入參 size 個元素時需要的最大容量。

然后是一個 whlie 循環,因為我們是通過構造方法進來的,所以 sizeCtl 的值現在是默認值 16 ,table 現在是 null。這個時候就進入到了 if 的代碼里了。

if 的條件里是判斷了 compareAndSwapInt() 的結果。這里需要說一下,compareAndSwapInt 方法是 CAS 的一種實現。這個方法內部做了兩件事情,首先是比較 this 這個對象的 SIZECTL 值是否跟 sc 相等,相等的話,把 SIZECTL 的值 改為 -1。而且 Unsafe 類還保證了線程的安全。如果有多個線程同時執行這個方法的話,只會有一個線程成功。不會出現兩個線程都比較通過了,然后在賦值的時候產生覆蓋的問題。

好像也不難理解,其實就是把 sizeCtl 值改成了 -1,而且只有一個線程會成功。這里的 sizeCtl 更像是一把鎖,哪個線程改成了 -1 ,哪個線程就獲取到了鎖,那它就可以執行后面的流程了。

繼續,因為上面已經對 tab = table 賦值了,所以下面的判斷也能通過。然后,就看到了數組初始化的過程了。直接 new 了一個長度為 n 的 Node[]。并賦值給了 table。如果你往上追一下 n 的賦值,就會知道,現在的 n 正好是 c。就是方法一開始計算的值。

table 數組都已經初始化了,是不是結束了?并沒有。這個時候更新了一下 sc。 >>> 2 相當于除 4,其實就是 sc 現在的值是 n 的 3/4 。而且在 finally 塊中,更新了 sizeCtl。這個時候 sizeCtl 就不是 -1 了。根據我們之前的理解,這里更新 sizeCtl,應該是在釋放鎖。

然后,第一次 while 循環就結束了。再次進入 while 循環,這次 sc 是 n 的 3/4 了,上一次循環已經更新了 sizeCtl

這次 table 就不等于 null 了。而且根據我們之前的推斷,現在的 sc 應該等于 n 的 3/4 ,而 n 之前又等于 c。所以, c <= sc 這個條件也不成立。

而且 n >= MAXIMUM_CAPACITY 這個條件大概率是在擴容到最大的時候才會成立。所以,就走到了最后一個條件分支里了。因為 sc 現在是大于 0 的,所以直接走到了最后一個分支。

PS: if (sc < 0) 這個分支好像永遠不會執行,因為 while 進入的條件要求 sc >= 0,而在判斷sc < 0 之前又沒有任何地方會把 sc 更新為負數,所以好像一直都不會執行。如果我理解錯了,希望有人能解惑一下~

8. resizeStamp()

首先執行了一下 resizeStamp() 方法。這個方法也是一個位運算的方法。你可以直接使用 main() 方法跑一下,會返回一個很難理解的正數。簡單說一下這個數是怎么得出來的。

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

首先, numberOfLeadingZeros() 會返回 n 的二進制串中從最左邊算起連續的“0”的總個數。然后再跟 1 左移 15 位的值按位或(|)操作。 最終得到的就是一個在二進制中,第 16 位為 1 的正數。

繼續回到代碼,因為現在已經確定 sc 是 n 的 3/4 了(PS:如果這個 3/4 不理解,那換成 0.75 是不是會好點 ,好像跟 HashMap 的擴容因子一樣,其實 sc 的值就是擴容閾值,這個后面會提到,現在不理解沒關系),所以也是大于 0 的。這里又進行了一次 compareAndSwapInt()。這個時候賦值的是把 rs 左移了 16 位。 還記得 resizeStamp() 返回的結果么,第 16 位是 1。所以 rs 右移 16 位之后,最高位就是 1 了,在 int 類型里,二進制的最高位表示正負,1 表示負數。

所以,這個時候,又把 sizeCtl 更新成負值了。根據我們之前的理解,這里應該還是獲取鎖的操作。獲取到鎖之后,一般就是需要操作資源了。但是 table 我們不是已經初始化好了嗎?這次又要初始什么呢?

記住,現在 sizeCtl 是一個負值,并且 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) 后面要用到!

9.transfer()

        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        //... ...

到這里還是比較好理解的。先初始了一下 striden 這兩個變量。然后,因為我們是初始化進來的,所以 nextTab 一定等于 null。這個時候會初始化 nextTab。在創建數組的時候捕獲了一個異常,這個異常出現的唯一情況就是內存不夠了,分配不了 2 倍的 n 的數組。這個時候,將 sizeCtl 的值改為了 Integer.MAX_VALUE。然后就結束了。如果沒有拋異常,會更新 nextTabletransferIndex 的值。

我們需要回頭看下 tryPresize() 方法。如果在拋異常的時候結束,會出現什么情況。根據代碼,異常結束后,會進入第三次循環,這次循環會進入第二個分支。因為 c <= sc 一定會成立。這里就會結束循環。

到這里,我們已經把 tryPresize() 循環里的三個分支都走完了,下面繼續看 transfer() 這個方法。

nextTab 初始化之后,我們又看到了一個新的 Node 類:

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

點到這個類的定義里我們會發現,這個類里面只有一個屬性 nextTable 和一個 find() 方法。關于 find() 是在獲取元素時才能用到,我們先不用關注。目前來看 ForwardingNode 其實就是對 nextTab 的一個封裝。然后繼續看 transfer()

兩個boolean 類型的值,默認一個 true,一個 false。

下面的代碼是一個 for 循環,但是這個循環有差不多 100 多行的代碼(如果我在項目里遇到這種代碼估計會罵人的~)。我們一點點看,首先是一個while 循環。

        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 
                    nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }

首先 i = 0, bound = 0 ,所以,第一次循環不會進入第一個分支。然后,根據之前 transferIndex = n; 的賦值,也不會進入第二個分支。

這樣就來到了第三個分支。compareAndSwapInt 會更新 transferIndex 的值,如果 CPU 的個數是 1,transferIndex 更新成 0 ,否則更新成 nextIndex - stride 。然后更新 bound、i、advance 的值,循環就結束了。

繼續往下,現在 i 的值是 n - 1,所以不會命中 if (i < 0 || i >= n || i + n >= nextn) 條件。

然后,因我們是初始化時進入的,素以 tab 里的所有元素都是 null,第二個條件就通過了。

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);

其實就是把 tab 的 i 位置 初始化了一個 fwd 元素。 到這里,第一次 for 循環就結束了。

第二次循環其實也很簡單,首先 advance = false ,不會進入 while 循環,然后就會進入下面的判斷

    else if ((f = tabAt(tab, i)) == null)
        advance = casTabAt(tab, i, null, fwd);
    else if ((fh = f.hash) == MOVED)
        advance = true; // already processed

首先,獲取了下 tab[i] 的值,因為上次循環已經賦值過了,現在 f = fwd。然后,有意思的來了,先看下 MOVED 的定義

    static final int MOVED     = -1; // hash for forwarding nodes

沒錯,MOVED = -1,按我們正常的理解,一個對象的 hash 值,怎么也不會等于 -1 吧,我們再回頭看下 ForwardingNode 這個類

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //... ...
    }
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        //... ... 
    }

順便貼了下父類的代碼,主要看構造函數,看到了么? fwd 這個對象在初始化的時候,指定了 hash 值,就是 MOVED。OK,回到之前的循環。

這次循環就會把 advance 改為 true。第二次循環就結束了。

經過上面的兩次循環之后,我們是其實只是執行了一行代碼 tab[n-1] = fwd。現在進入第三次循環,之前 i = n - 1 。現在又執行了一次 if (--i >= bound || finishing),所以現在 i = n - 2 。但是 bound 可能有兩種情況,一種是 bound = n - stride,一種是 bound = 0。我們先假設 bound = n - stride; stride = 16 。所以,第一個條件是成立的,執行 advance = false; ,然后 while 循環結束。

然后,第一個 if (i < 0 || i >= n || i + n >= nextn) 條件不會成立,又執行到了賦值操作里。這時 tab[n-2] = fwd。第三次循環結束~

然后第四次循環又會把 advance 改為 true

好好回味下~~

其實,這一頓操作下來就是在執行 tab[i] = fwd 這一行代碼。搞了這么多東西,其實就是在支持多線程并發擴容,簡單說下過程。

首先,while 循環會確定當前線程擴容的區間 [ bound,nextIndex ) 左開右閉。然后 while 循環下面的代碼其實就是在給 tabnextTab 賦值。設想下,如果 while 循環里的 compareAndSwapInt 執行失敗,會是什么情況?沒錯,會空轉!結束只有兩種情況,一種是 transferIndex = 0。說明已經有其他線程把所有的區間都認領了,另外一種情況是執行 compareAndSwapInt。認領 [ bound,nextIndex ) 的區間,進行擴容。

其實,你可以直接驗證下的,打斷點也好,手寫也罷,假設n = 1024; NCPU = 4。這時 stride = 32。那么第一個線程會先對 tab[1024-32,1024-1]進行賦值。如果這時有其他線程進來,在 while 循環的時候,就會認領 tab[1024-32-32,1024-32-1] 的區間進行賦值。如果有更多的線程進來的話,就會加快這個過程。這個就是所謂的 并發擴容 ,也有叫 輔助擴容 的。

然后,我們來看下通過 synchronized 加鎖的這段代碼。能執行到這里的話只能是 tab[i].hash != MOVED。那就說明這里記錄的是一個正常的數據。

首先判斷了下 if (tabAt(tab, i) == f) 沒什么說的,肯定成立,不成立就結束了,然后判斷了下 if (fh >= 0)。有點奇怪,正常數據的 hash 還能小于 0 ?那我們先看下不成立的情況

 else if (f instanceof TreeBin) 

明白了吧,當發生 hash 沖突時,并且鏈表已經轉成紅黑樹了,這時 tab[i] = TreeBin,那我們看下 TreeBin 的定義。

static final class TreeBin<K,V> extends Node<K,V> {
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ...
    }
}
 static final int TREEBIN   = -2; // hash for roots of trees

真相了,TreeBin 的 hash 值是 -2,就小于 0。后面的代碼我們就不說了,其實跟HashMap是一樣的,如果當前節點是鏈表,那就采用高低位鏈表的形式對 nextTab 賦值,如果是 TreeBin 那就采用紅黑樹的方式進行賦值。而且,我們對 tab[i] 加了 synchronized 鎖,也不會有線程競爭,老老實實賦值就好了。

最后,transfer 里的代碼基本上都看完了,就剩下面這段了

    if (i < 0 || i >= n || i + n >= nextn) {
        int sc;
        if (finishing) {
            nextTable = null;
            table = nextTab;
            sizeCtl = (n << 1) - (n >>> 1);
            return;
        }
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit
        }
    }

while 循環里,有這么一行代碼 i = -1; 執行了這個之后,就會進入上面的代碼里。其實就是 tab 初始化完成之后,即 nextIndex = 0 的時候,就會執行 i = -1; ,然后就會進入上面的代碼了。我們看下上面的代碼。

首先,finishing 現在應該是等于 false 的,直接進入第二個 if。這個也很簡單,首先 sc = sizeCtl,賦值,然后通過 CAS 將 sizeCtl 的值改為 sc - 1。還記得 sizeCtl 的值是多少么?? 我直接粘貼一下 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)。是不是跟 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 這個判斷邏輯一致?如果不相等,說明有其他線程修改了sizeCtl,這同時說明有其他線程還在執行擴容的動作,即還在執行 tryPresize() 或者是 transfer() 方法。那么,因為當前線程已經執行完了,所以直接 return; 結束,讓其他線程繼續執行就好了。

如果相等,執行 finishing = advance = true; i = n。進入下一次 for 循環。

一頓判斷之后,你會發現,還是會進入到上面的代碼,而這時,finishing == true 了!下面的代碼就不難理解了吧,更新 table ,相當于使用了新的數組了,而 sizeCtl 也更新了一下。都是位運算,如果你看不明白,可以用 main 方法跑一下。我們假設 n = 1024,那么 table 現在的大小也就是之前 nextTab 的大小,就是2048,然后,我們用 main 跑一下 sizeCtl 的值,不出意外的話應該等于 1536 。如果還看不明白,那么你再計算下 1536 / 2048。結果是 0.75 ,這個數字熟悉吧? HashMap 的默認加載因子!。沒錯,sizeCtl 其實就是下次需要擴容的閾值。

到這里,我們就把 transfer() 方法看完了。然后,我們重點總結下 sizeCtl 這個屬性,不得不承認,這個設計非常的巧妙!

首先,通常情況下 sizeCtl 應該是等于下次的擴容閾值的。但是在擴容期間,有兩個狀態,一個是 -1,一個是非常大的一個負值。等于 -1 很好理解,相當于是一個鎖,哪個線程更新成功,就可以進行數組的初始化動作。那么,就剩最后一種情況了。直接用下面的 main() 方法跑一下

    public static void main(String[] args) throws IllegalAccessException {
        int rs = Integer.numberOfLeadingZeros(1024) | (1 << (16 - 1));
        int sizeCtl = (rs << 16) + 2;
        System.out.println(sizeCtl);
        System.out.println((sizeCtl<<16)>>16);
    }

會得到下面的結果

-2146107390
2

有意思了,(sizeCtl<<16)>>16,這個操作是先左移 16 位,再右移 16 位,相當于把 sizeCtl 的高 16 位都置為 0 了。得到了一個 2,其實,這個 2 就是說當前有 (2 - 1) 個線程在進行擴容操作。(PS: sizeCtl 注釋里有寫~)。具體是為什么,我們繼續往下看。

transfer() 執行完,就回到了 tryPresize()。然后繼續返回就到了 putAll()。繼續往下執行就是 putVal() 方法了。

10.putVal()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
        //... ...

首先判斷了下 null,然后計算了下 hash 哈希值。就進入 for 循環了。首先是第一個分支。其實就是 tab 還沒有初始化的時候會進入這個分支。

11.initTable()

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

好像也沒有多少復雜的,因為我們之前已經對 sizeCtl 做了充分的解釋,這里如果 sc < 0 的話,說明在擴容或者是初始化中,然后當前線程直接執行了 Thread.yield();,就是放棄 CPU 執行權等待下次分配 CPU 時間片,如果不小于 0 ,并且 tab = null ,那說明現在還沒有線程執行擴容,那當前線程就會更新 sizeCtl,然后自己開始執行初始化動作,初始化好后直接返回 tab

繼續回到 putVal() 方法,如果執行第二個分支,說明 tab[i] == null,這個位置還沒有元素,直接通過 casTabAt() 方法進行賦值。如果這個位置有值,并且 (fh = f.hash) == MOVED 說明在擴容或者是在初始化,這個時候當前線程會進行 輔助擴容

12.helpTransfer()

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

首先在 if 條件里獲取到 nextTab 如果不為 null,就會進入 while 循環,首先 sc < 0 說還在擴容或者是初始化中,while 循環里的第一個分支是不需要輔助擴容或者是已經達到最大的輔助線程數量,或者是已經剩最后一個線程在擴容了,其他的線程都結束了。所以直接 break; 就可以了。

第二個分支,首先會執行 sizeCtl + 1 的操作,執行成功就會執行 transfer() 方法,這個方法我們之前已經看過了,就不看了,需要注意的是,我們之前說過,sizeCtl 的低 16 位代表目前正在擴容的線程數減一。因為這里新加入一個線程參與擴容,所以對 sizeCtl 進行了加一的操作。如果還有線程進來,那么 sizeCtl 還會加一。這里就解釋清楚 sizeCtl 的另外的一個用法了。擴容結束直接 break;。繼續回到 putVal()

繼續下一次 for,如果 tab[i] 還不等于 null,那就說明發生哈希沖突了,并且當前已經不在擴容了。就走到了最后一個分支,使用 synchronized 加鎖的這一段代碼里,這段代碼其實并不復雜,發生沖突之后無非就兩種情況,鏈表或者是紅黑樹。你可以看下 TreeBin 的構造方法。它的哈希值是 -2。

    static final int TREEBIN   = -2; // hash for roots of trees
    //... ...
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        //... ... 
    }

所以才有了 if (fh >= 0) 的判斷,如果首節點的哈希值大于 0 ,那一定是鏈表。

最后還有一段進行樹化的判斷操作。

    static final int TREEIFY_THRESHOLD = 8;
    //... ...
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

鏈表的節點數超過 8 就會進行樹化操作。到這里其實 putVal() 的相關操作基本上已經結束了,就剩最后一個 addCount() 方法了,看名稱也知道這個是更新計數器的,盲猜也能猜到應該跟元素數量有關系。不過,好像有點問題啊,你有沒有發現,在整個 putVal() 方法里面,好像沒有觸發擴容的邏輯!!

13.addCount()

其實這個方法除了操作我們之前見到的 counterCells 屬性外,還會判斷是否需要進行擴容。因為只有在知道具體的元素數量后,才能判斷出是否需要擴容。我們先看這個方法的第一段代碼。

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //... ...

首先,我們假設目前是第一次執行這個方法,那么 counterCells == null,然后就會使用 CAS 執行 baseCount = b + x失敗之后,就開始執行 fullAddCount() 方法了,因為現在 as == null 是成立的。

fullAddCount() 方法與 Striped64.longAccumulate() 方法基本上是一模一樣的,因為之前已經跳過了 Striped64,所以這里也不打算去細看。直接總結下 counterCells 的作用。其實 counterCells 就是在多個線程同時更新 baseCount 失敗時記錄下新增的元素數量。

舉個例子就明白了,假設 ConcurrentHashMap 初始化完成之后,有 2 個線程,同時執行了 addCount(),那么 baseCount 會更新成 1,counterCells 會初始化為一個大小為 2 的數組,且一個元素是 null,另外一個元素的 counterCells[i].value 值是 1。

如果這個時候又來了一個線程,會有 3 種情況,

  • CAS 更新 baseCount 成功,baseCount = 2。第三個線程結束
  • CAS 失敗且 counterCells[ThreadLocalRandom.getProbe() & m] == null。繼續初始化 counterCells 的另一個為 null 的元素,值為 1。
  • CAS 失敗且 counterCells[ThreadLocalRandom.getProbe() & m] != null,那么 counterCells[i].value 值更新成 2。

ThreadLocalRandom.getProbe() 方法其實就是取了個隨機值。就是說,如果有多個線程同時更新的話,失敗的線程會隨機的從 counterCells 取一個元素,將新增的數量保存進去。

其實很簡單,能進入到 fullAddCount() 方法的條件只有一種,counterCells == null 并且 CAS 更新 baseCount 失敗,這種情況就是有多個線程同時執行了 addCount() 方法,比如,有兩線程同時執行 putVal(),那么必然有一個線程在 CAS 更新 baseCount 時會失敗,這個時候就進入到 fullAddCount() 方法。這個方法內部就是在操作 counterCells 數組。操作的行為基本上就是下面這幾種

要么是初始化 counterCells 數組

if (counterCells == as) {
    CounterCell[] rs = new CounterCell[2];
    rs[h & 1] = new CounterCell(x);
    counterCells = rs;
    init = true;
}

要么就是初始化 counterCells 數組元素

    CounterCell r = new CounterCell(x); // Optimistic create
    if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        boolean created = false;
        try {               // Recheck under lock
            CounterCell[] rs; int m, j;
            if ((rs = counterCells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                rs[j] = r;
                created = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (created)
            break;
        continue;           // Slot is now non-empty
    }

要么就是更新 counterCells 數組元素的值

    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        break;

還有一種操作就是 擴容 ,對 counterCells 進行擴容。

    if (cellsBusy == 0 &&
            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
        try {
            if (counterCells == as) {// Expand table unless stale
                CounterCell[] rs = new CounterCell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                counterCells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }

fullAddCount() 方法里,每次循環都會重新隨機取元素 h = ThreadLocalRandom.advanceProbe(h);。如果執行循環了多次,都沒有保存成功,說明 counterCells 的容量不夠用了,就會觸發擴容。從上面的代碼里也能看到,counterCells 的擴容非常簡單,數組直接翻倍,元素直接賦值到新數組里,位置都沒有變。

繼續回到 addCount() 方法,之后的邏輯就是判斷了下 check 參數。其實這里的邏輯是,如果有多個線程同時操作,只要沒有發生哈希沖突,就不進行擴容檢查。你往回翻一下就可以看到 check 參數其實就是 tab[i] 位置的元素數量。

如果發生了哈希沖突,或者說沒有多個線程同時操作(這個時候就進入不了當前的分支,更新 baseCount 不會失敗),就會執行 s = sumCount(); 這個方法非常簡單,就是對 baseCountcounterCells 里的數值進行了一下求和,然后就開始執行下面的代碼。

    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }

首先 while 循環判斷了下當前的元素數量是否超過了 sizeCtl,即便是在擴容期間,sizeCtl 小于 0 的時候,也算成立。然后判斷了下 tab ,基本上也會成立。直接進入循環內部

第一個分支 if (sc < 0) 說明已經有線程開始執行擴容動作了,這個時候更新 sizeCtl 的值加一,當前線程參與 輔助擴容

第二個分支是目前還沒有線程進行擴容操作,那么當前線程開始執行擴容,(rs << RESIZE_STAMP_SHIFT) + 2) 這個數值我們之前已經看到過了,就不再贅述了。

循環最后重新計算 s ,擴容結束后 s 就不會小于 sizeCtl,方法結束。

好了,到這里我們基本上就把 ConcurrentHashMap 的 put 操作的邏輯看完了。其實整體上跟 HashMap還是比較類似的,基本上就是把所有對 tab 的操作都使用 Unsafe 包裝了一下,解決多線程操作的問題,而發生哈希沖突時也是使用了 synchronized 進行了加鎖,解決了多線程操作鏈表的覆蓋問題。比較難的反而是元素數量的問題。因為 ConcurrentHashMap 一定要保證元素保存到 tab 成功后,元素數量一定也要加成功!不能因為元素數量的值更新失敗了,再把保存到 tab 里的元素刪除掉吧。所以呢 ConcurrentHashMap 就使用 counterCells 數組來保存那些更新 baseCount 失敗的數量。

14.get()

下面我們看下 get() 方法

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

好像也不復雜,第一個分支是一次就從 tab[i] 位置找到了元素,直接返回。最后一個 while 循環是 tab[i] 位置發生了哈希沖突,且當前位置是鏈表,通過 while 循環遍歷尋找。

重點說一下第二個分支吧 else if (eh < 0) 有兩種情況,一種是 tab[i] 位置發生了哈希沖突,且當前位置是紅黑樹,這時 eTreeBin 類型的,因為涉及到紅黑樹,我們直接跳過,有興趣的可以自己研究下。另外一種情況是在擴容期間,當前元素已經轉移到新的 nextTable 上了,這時 e 的類型是 ForwardingNode 類型,我們直接看下 ForwardingNode 類的 find() 代碼

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }

也不復雜,就是直接在 nextTable 找元素,如果 nextTable[i] 位置為 null 直接返回,否則就進入了 for (;;) 循環里,跟之前類似,第一個分支里是直接找到了元素,而 if (eh < 0) 也有兩種情況,一個是擴容時轉移到新的 nextTable,一個就是紅黑樹。最后就是鏈表了。

好了,到這里基本上所有的內容都結束了,最后還剩一點有意思的東西,就是 Traverser 類,這個類其實實現了在擴容期間,也能使 ConcurrentHashMap 可以高效的(不使用鎖)遍歷。代碼不多,有興趣的話可以讀一下~

以下是遺留的一些內容,有機會再補吧

  • sun.misc.Unsafe 類。
  • LongAdder and Striped64
  • sun.misc.Contended 注解
  • TreeBin

以上就是Java源碼重讀之ConcurrentHashMap詳解的詳細內容,更多關于Java ConcurrentHashMap的資料請關注其它相關文章!

原文地址:https://juejin.cn/post/7230450989591986236

延伸 · 閱讀

精彩推薦
  • Java教程淺談Java多線程編程中Boolean常量的同步問題

    淺談Java多線程編程中Boolean常量的同步問題

    這篇文章主要介紹了淺談Java多線程編程中Boolean常量的同步問題,主要針對線程之間同步了不同的布爾對象的問題,需要的朋友可以參考下 ...

    johnshen07084692020-01-12
  • Java教程SpringBoot 簽到獎勵實現方案的示例代碼

    SpringBoot 簽到獎勵實現方案的示例代碼

    這篇文章主要介紹了SpringBoot 簽到獎勵實現方案的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的...

    當年的春天3592020-08-31
  • Java教程聊聊Springboot2.x的session和cookie有效期

    聊聊Springboot2.x的session和cookie有效期

    這篇文章主要介紹了Springboot2.x的session和cookie有效期,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教...

    Agly_Charlie8092022-01-06
  • Java教程java實現簡單的給sql語句賦值的示例

    java實現簡單的給sql語句賦值的示例

    這篇文章主要介紹了java實現簡單的給sql語句賦值的示例,需要的朋友可以參考下 ...

    Java教程網5002019-11-22
  • Java教程MyBatis快速入門(簡明淺析易懂)

    MyBatis快速入門(簡明淺析易懂)

    MyBatis是支持普通SQL查詢,存儲過程和高級映射的優秀持久層框架。mybatis的學習是程序員的必修課。今天小編通過分享本教程幫助大家快速入門mybatis,對...

    夏中偉3552020-07-02
  • Java教程拉鉤網java筆試題分享

    拉鉤網java筆試題分享

    這篇文章主要介紹了拉鉤網java筆試題分享,下面是題目和實現示例,需要的朋友可以參考下 ...

    Java教程網3142019-11-22
  • Java教程Mybatis批量更新三種方式的實現

    Mybatis批量更新三種方式的實現

    這篇文章主要介紹了Mybatis批量更新三種方式的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下...

    僵尸吃過跳跳糖7602021-07-14
  • Java教程spring整合cxf框架實例

    spring整合cxf框架實例

    下面小編就為大家帶來一篇spring整合cxf框架實例。小編覺得挺不錯的。現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧...

    Java教程網3962020-11-22
主站蜘蛛池模板: 性白俄罗斯高清xxxxx | 好大好深好涨好烫还要 | a级黄色视屏 | 日本加勒比无码av | 亚洲狠狠综合久久 | 成人伊人青草久久综合网破解版 | 边摸边吃奶玩乳尖视频 | 无遮免费网站在线入口 | 欧美三级免费观看 | 91精品国产免费久久国语蜜臀 | 日本 视频 在线 | 国产乱码免费卡1卡二卡3卡四 | 久久综合色超碰人人 | 欧美一卡二卡科技有限公司 | 天天做日日爱 | chinese军人@gay | 催眠白丝舞蹈老师小说 | 射综合网 | 免费观看一级欧美在线视频 | 暖暖的视频完整视频韩国免费 | 亚洲欧美日韩天堂在线观看 | 69日本人 | 华人在线京东热 | 给我一个黄色网址 | 久久国产精品福利影集 | 国产精品视频自拍 | 亚洲卡一卡2卡三卡4卡无卡三 | 色综合久久中文字幕综合网 | 国产aaaaa一级毛片 | 国产一区二区三区高清 | 91九色最新地址 | 国产一级片免费视频 | 亚洲精品欧洲久久婷婷99 | 日老逼| 九色PORNY蝌蚪视频首页 | 成人久久网站 | 男人使劲躁女人小视频 | 无套日出白浆在线播放 | 免费看视频高清在线观看 | 亚洲高清视频免费 | 国产自产在线 |