CountDownLatch 源碼解析—— countDown()
上一篇文章從源碼層面說(shuō)了一下CountDownLatch 中 await() 的原理。這篇文章說(shuō)一下countDown() 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public void countDown() { //CountDownLatch sync.releaseShared( 1 ); } ↓ public final boolean releaseShared( int arg) { //AQS if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } ↓ protected boolean tryReleaseShared( int releases) { //CountDownLatch.Sync // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c- 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } |
通過(guò)構(gòu)造器 CountDownLatch end = new CountDownLatch(2); state 被設(shè)置為2,所以c == 2,nextc = 2-1,
然后通過(guò)下面這個(gè)CAS操作將state設(shè)置為1。
1
2
3
4
|
protected final boolean compareAndSetState( int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt( this , stateOffset, expect, update); } |
此時(shí)nextc還不為0,返回false。一直等到countDown() 方法被調(diào)用兩次,state == 0,nextc ==0,此時(shí)返回true。
進(jìn)入doReleaseShared()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
doReleaseShared(); ↓ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; // loop on failed CAS } if (h == head) // loop if head changed break ; } } |
回顧一下此時(shí)的等待隊(duì)列模型。
1
2
3
|
+--------------------------+ prev +------------------+ head | waitStatus = Node.SIGNAL | <---- node(tail) | currentThread | +--------------------------+ +------------------+ |
此時(shí)head 不為null,也不為tail,waitStatus == Node.SIGNAL,所以進(jìn)入 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 這個(gè)判斷。
1
2
3
4
5
6
7
8
9
10
11
|
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) ↓ /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } |
這個(gè)CAS 操作將 state 設(shè)置為 0 ,也就是說(shuō)此時(shí)Head 中的 waitStatus 是0.此時(shí)隊(duì)列模型如下所示
1
2
3
|
+----------------+ prev +------------------+ head | waitStatus = 0 | <---- node(tail) | currentThread | +----------------+ +------------------+ |
該方法返回true。進(jìn)入unparkSuccessor(h);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
unparkSuccessor(h); ↓ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } |
s 就是head的后繼結(jié)點(diǎn),也就是裝有當(dāng)前線程的結(jié)點(diǎn)。s != null ,并且s.waitStatus ==0 ,所以進(jìn)入 LockSupport.unpark(s.thread);
1
2
3
4
|
public static void unpark(Thread thread) { if (thread != null ) UNSAFE.unpark(thread); } |
也就是unlock 被阻塞的線程。裁判被允許吹哨了!
countDown() 的原理就此就非常清晰了。
每執(zhí)行一次countDown() 方法,state 就是減1,直到state == 0,則開(kāi)始釋放被阻塞在隊(duì)列中的線程,根據(jù)前驅(qū)結(jié)點(diǎn)中waitStatus的狀態(tài),釋放后續(xù)結(jié)點(diǎn)中的線程。
OK,回到上一篇文章的問(wèn)題,什么時(shí)候跳出下面這個(gè)循環(huán)(await方法中的循環(huán))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; // help GC failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } |
此時(shí)state == 0,所以進(jìn)入 setHeadAndPropagate 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
setHeadAndPropagate(node, r); ↓ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } ↓ private void setHead(Node node) { head = node; node.thread = null ; node.prev = null ; } |
這個(gè)方法將head 的后繼結(jié)點(diǎn)變?yōu)閔ead。該方法過(guò)后,又將node的next結(jié)點(diǎn)設(shè)置為null,模型變成下圖
1
2
3
|
prev +---------+ next null <---- node(tail/head) | null | ----> null +---------+ |
也就是node head tail 什么的都被置為null,等待GC回收了,這個(gè)時(shí)候return,跳出了for循環(huán),隊(duì)列被清空。
下面演示一下整個(gè)過(guò)程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
setHeadAndPropagate(node, r); +----------------+ head(tail) | waitStatus= 0 | | thread = null | +----------------+ ↓ +----------------+ +----------------+ | waitStatus= 0 | prev | waitStatus= 0 | head(tail) | thread = null | <---- node | currentThread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | waitStatus= 0 | prev | waitStatus= 0 | head | thread = null | <---- node(tail) | currentThread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | Node.SIGNAL | prev | waitStatus= 0 | head | thread = null | <---- node(tail) | currentThread | +----------------+ +----------------+ ↓ +----------------+ +----------------+ | waitStatus= 0 | prev | waitStatus= 0 | head | thread = null | <---- node(tail) | currentThread | +----------------+ +----------------+ ↓ +----------------+ prev | waitStatus= 0 | next null <---- node(tail/head) | null | ----> null +----------------+ |
CountDownLatch 的核心就是一個(gè)阻塞線程隊(duì)列,這是由鏈表構(gòu)造而成的隊(duì)列,里面包含thread 和 waitStatus,其中waitStatus說(shuō)明了后繼結(jié)點(diǎn)線程狀態(tài)。
state 是一個(gè)非常重要的標(biāo)志,構(gòu)造時(shí),設(shè)置為對(duì)應(yīng)的n值,如果n != 0,阻塞隊(duì)列將一直阻塞,除非中斷線程。
每次調(diào)用countDown() 方法,就是將state-1,而調(diào)用await() 方法就是將調(diào)用該方法的線程加入到阻塞隊(duì)列,直到state==0,才能釋放線程。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://www.cnblogs.com/cuglkb/p/8686415.html