一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - Java并發(fā)編程之Condition源碼分析(推薦)

Java并發(fā)編程之Condition源碼分析(推薦)

2021-07-24 15:21什么都會(huì)一點(diǎn) Java教程

這篇文章主要介紹了Java并發(fā)編程之Condition源碼分析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

condition介紹

上篇文章講了reentrantlock的加鎖和釋放鎖的使用,這篇文章是對(duì)reentrantlock的補(bǔ)充。reentrantlock#newcondition()可以創(chuàng)建condition,在reentrantlock加鎖過程中可以利用condition阻塞當(dāng)前線程并臨時(shí)釋放鎖,待另外線程獲取到鎖并在邏輯后通知阻塞線程"激活"。condition常用在基于異步通信的同步機(jī)制實(shí)現(xiàn)中,比如dubbo中的請(qǐng)求和獲取應(yīng)答結(jié)果的實(shí)現(xiàn)。

常用方法

condition中主要的方法有2個(gè)

  1. (1)await()方法可以阻塞當(dāng)前線程,并釋放鎖。
  2. (2)在獲取鎖后可以調(diào)用signal()通知被await()阻塞的線程"激活"。

這里的await(),signal()必須在reentrantlock#lock()和reentrantlock#unlock()之間調(diào)用。

condition實(shí)現(xiàn)分析

condition的實(shí)現(xiàn)也是利用abstractqueuedsynchronizer隊(duì)列來實(shí)現(xiàn),await()在被調(diào)用后先將當(dāng)前線程加入到等待隊(duì)列中,然后釋放鎖,最后阻塞當(dāng)前線程。signal()在被調(diào)用后會(huì)先獲取等待隊(duì)列中第一個(gè)節(jié)點(diǎn),并將這個(gè)節(jié)點(diǎn)轉(zhuǎn)化成reentrantlock中的節(jié)點(diǎn)并加入到同步阻塞隊(duì)列的結(jié)尾,這樣此節(jié)點(diǎn)的上個(gè)節(jié)點(diǎn)線程釋放鎖后會(huì)激活此節(jié)點(diǎn)線程取來獲取鎖。

await()方法源碼分析

await()源碼如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void await() throws interruptedexception {
        //判斷是否當(dāng)前線程是否被中斷中斷則拋出中斷異常
      if (thread.interrupted())
        throw new interruptedexception();
        //加入等待隊(duì)列
      node node = addconditionwaiter();
        //釋放當(dāng)前線程鎖
      int savedstate = fullyrelease(node);
      int interruptmode = 0;
        //判斷是否在同步阻塞隊(duì)列,如果不在一直循環(huán)到被加入
      while (!isonsyncqueue(node)) {
        //阻塞當(dāng)前線程
        locksupport.park(this);
        //判斷是否被中斷
        if ((interruptmode = checkinterruptwhilewaiting(node)) != 0)
          break;
      }
        //獲取鎖,如果獲取中被中斷則設(shè)置中斷狀態(tài)
      if (acquirequeued(node, savedstate) && interruptmode != throw_ie)
        interruptmode = reinterrupt;
        //清除等待隊(duì)列中被"激活"的節(jié)點(diǎn)
      if (node.nextwaiter != null) // clean up if cancelled
        unlinkcancelledwaiters();
        //如果當(dāng)前線程被中斷,處理中斷邏輯
      if (interruptmode != 0)
        reportinterruptafterwait(interruptmode);
    }

主要分以下幾步

  1. (1)先判斷是否當(dāng)前線程是否被中斷中斷則拋出中斷異常如果未中斷調(diào)用addconditionwaiter()加入等待隊(duì)列
  2. (2)調(diào)用fullyrelease(node)釋放鎖使同步阻塞隊(duì)列的下個(gè)節(jié)點(diǎn)線程能獲取鎖。
  3. (3)調(diào)用isonsyncqueue(node)判斷是否在同步阻塞隊(duì)列,這里的加入同步阻塞隊(duì)列操作是在另一個(gè)線程調(diào)用signal()后加入,如果不在同步阻塞隊(duì)列會(huì)進(jìn)行阻塞直到被激活。
  4. (4)如果被激活然后調(diào)用checkinterruptwhilewaiting(node)判斷是否被中斷并獲取中斷模式。
  5. (5)繼續(xù)調(diào)用isonsyncqueue(node)判斷是否在同步阻塞隊(duì)列。
  6. (6)是則調(diào)用acquirequeued(node, savedstate) 獲取鎖,這里如果獲取不到也會(huì)被阻塞,獲取不到原因是在第一次調(diào)用isonsyncqueue(node)前,可能另一個(gè)線程已經(jīng)調(diào)用signal()后加入到同步阻塞隊(duì)列,然后調(diào)用acquirequeued(node, savedstate) 獲取不到鎖并阻塞。acquirequeued(node, savedstate)也會(huì)返回當(dāng)前線程是否被中斷,如果被中斷設(shè)置中斷模式。
  7. (7)在激活后調(diào)用unlinkcancelledwaiters()清理等待隊(duì)列的已經(jīng)被激活的節(jié)點(diǎn)。
  8. (8)最后判斷當(dāng)前線程是否被中斷,如果被中斷則對(duì)中斷線程做處理。

下面來看下addconditionwaiter()實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private node addconditionwaiter() {
        //獲取等待隊(duì)列尾部節(jié)點(diǎn)
      node t = lastwaiter;
      //如果尾部狀態(tài)不為condition,如果已經(jīng)被"激活",清理之,然后重新獲取尾部節(jié)點(diǎn)
      if (t != null && t.waitstatus != node.condition) {
        unlinkcancelledwaiters();
        t = lastwaiter;
      }
        //創(chuàng)建以當(dāng)前線程為基礎(chǔ)的節(jié)點(diǎn),并將節(jié)點(diǎn)模式設(shè)置成condition
      node node = new node(thread.currentthread(), node.condition);
        //如果尾節(jié)點(diǎn)不存在,說明隊(duì)列為空,將頭節(jié)點(diǎn)設(shè)置成當(dāng)前節(jié)點(diǎn)
      if (t == null)
        firstwaiter = node;
        //如果尾節(jié)點(diǎn)存在,將此節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)
      else
        t.nextwaiter = node;
        //將尾節(jié)點(diǎn)設(shè)置成當(dāng)前節(jié)點(diǎn)
      lastwaiter = node;
      return node;
    }

addconditionwaiter()的邏輯很簡(jiǎn)單,就是創(chuàng)建以當(dāng)前線程為基礎(chǔ)的節(jié)點(diǎn)并把節(jié)點(diǎn)加入等待隊(duì)列的尾部待其他線程處理。

下面來看下fullyrelease(node node)實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final int fullyrelease(node node) {
    boolean failed = true;
    try {
        //獲取阻塞隊(duì)列中當(dāng)前線程節(jié)點(diǎn)的鎖狀態(tài)值
      int savedstate = getstate();
        //釋放當(dāng)前線程節(jié)點(diǎn)鎖
      if (release(savedstate)) {
        failed = false;
        return savedstate;
      } else {
        throw new illegalmonitorstateexception();
      }
    } finally {
        //釋放失敗講節(jié)點(diǎn)等待狀態(tài)設(shè)置成關(guān)閉
      if (failed)
        node.waitstatus = node.cancelled;
    }
  }

調(diào)用getstate()先獲取阻塞隊(duì)列中當(dāng)前線程節(jié)點(diǎn)的鎖狀態(tài)值,這個(gè)值可能大于1表示多次重入,然后調(diào)用release(savedstate)釋放所有鎖,如果釋放成功返回鎖狀態(tài)值。

下面來看下isonsyncqueue(node node)實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean isonsyncqueue(node node) {
        //判斷當(dāng)前節(jié)點(diǎn)是否是condition或者前置節(jié)點(diǎn)是否為空如果為空直接返回false
    if (node.waitstatus == node.condition || node.prev == null)
      return false;
        //如果下個(gè)節(jié)點(diǎn)存在,則在同步阻塞隊(duì)列中返回true
    if (node.next != null) // if has successor, it must be on queue
      return true;
        //遍歷查找當(dāng)前節(jié)點(diǎn)是否在同步阻塞隊(duì)列中
    return findnodefromtail(node);
  }
  private boolean findnodefromtail(node node) {
    node t = tail;
    for (;;) {
      if (t == node)
        return true;
      if (t == null)
        return false;
      t = t.prev;
    }
  }

此方法的功能是查找當(dāng)前節(jié)點(diǎn)是否在同步阻塞隊(duì)列中,方法先是快速判斷,判斷不了再進(jìn)行遍歷查找。

  1. (1)第一步先判斷次節(jié)點(diǎn)是否condition狀態(tài)或者前置節(jié)點(diǎn)是否存在,如果是表明不在隊(duì)列中返回false,阻塞隊(duì)列中的狀態(tài)一般是0或者signal狀態(tài)而且如果當(dāng)前如果當(dāng)前節(jié)點(diǎn)在隊(duì)列阻塞中且未被激活前置節(jié)點(diǎn)一定不為空。
  2. (2)第二步判斷節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)是否存在,如果存在則表明當(dāng)前當(dāng)前節(jié)點(diǎn)已加入到阻塞隊(duì)列中。
  3. (3)如果以上2點(diǎn)都沒法判斷,也有可能剛剛加入到同步阻塞隊(duì)列中,所以調(diào)用findnodefromtail(node node)做最后的遍歷查找。查找從隊(duì)列尾部開始查,從尾部開始查的原因是可能剛剛加入到同步阻塞隊(duì)列中,從尾部能快速定位。

下面看下checkinterruptwhilewaiting(node node)實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private int checkinterruptwhilewaiting(node node) {
      return thread.interrupted() ?
        (transferaftercancelledwait(node) ? throw_ie : reinterrupt) :
        0;
    }
 
final boolean transferaftercancelledwait(node node) {
    if (compareandsetwaitstatus(node, node.condition, 0)) {
      enq(node);
      return true;
    }
    while (!isonsyncqueue(node))
      thread.yield();
    return false;
  }

此方法在線程被激活后被調(diào)用,主要功能就是判斷被激活的線程是否被中斷。此方法會(huì)返回2種中斷狀態(tài)throw_ie和reinterrupt,throw_ie是調(diào)用signal()前被中斷返回,reinterrupt在調(diào)用signal()后被中斷返回。 此方法先判斷是否被標(biāo)記中斷,是的話再調(diào)用transferaftercancelledwait(node)取判斷是那種中斷狀態(tài),transferaftercancelledwait(node)方法分2步

  1. (1)用cas方式將節(jié)點(diǎn)狀態(tài)改錯(cuò)等待狀態(tài)改成condition,并加入到同步阻塞隊(duì)列中返回true
  2. (2)如果不能加入到同步阻塞隊(duì)列就自旋一直等待加入

如果使用await()方法上面2步其實(shí)是沒什么作用其最后一定會(huì)返回false,因?yàn)閍wait()被激活只能調(diào)用 signal()方法,而signal()方法肯定已經(jīng)將節(jié)點(diǎn)加入到同步阻塞隊(duì)列中。所以以上邏輯是給await(long time, timeunit unit)等帶超時(shí)激活方法用的。

acquirequeued(node, savedstate)方法再上一章節(jié)已經(jīng)講過這邊就不重復(fù)了,下面分析下unlinkcancelledwaiters()方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void unlinkcancelledwaiters() {
        //獲取等待隊(duì)列頭節(jié)點(diǎn)
      node t = firstwaiter;
      node trail = null;
      while (t != null) {
        //獲取下個(gè)節(jié)點(diǎn)
        node next = t.nextwaiter;
        //如果狀態(tài)不為condition說明已經(jīng)加入阻塞隊(duì)列需要清理掉
        if (t.waitstatus != node.condition) {
          t.nextwaiter = null;
          if (trail == null)
            firstwaiter = next;
          else
            //獲取下個(gè)節(jié)點(diǎn)
            trail.nextwaiter = next;
          if (next == null)
            lastwaiter = trail;
        }
        else
          trail = t;
        t = next;
      }
    }

此方法就是從頭開始查找狀態(tài)不為condition的節(jié)點(diǎn)并清理,狀態(tài)不為condition節(jié)點(diǎn)說明此節(jié)點(diǎn)已經(jīng)加入到阻塞隊(duì)列,已經(jīng)不需要維護(hù)。

下面來看下reportinterruptafterwait(interruptmode)方法

?
1
2
3
4
5
6
7
8
9
private void reportinterruptafterwait(int interruptmode)
      throws interruptedexception {
        //如果是throw_ie模式直接拋出異常
      if (interruptmode == throw_ie)
        throw new interruptedexception();
        //如果是reinterrupt模式標(biāo)記線程中斷由上層處理中斷
      else if (interruptmode == reinterrupt)
        selfinterrupt();
    }

此方法處理中斷邏輯。如果是throw_ie模式直接拋出異常,如果是reinterrupt模式標(biāo)記線程中斷由上層處理中斷。

signal()方法源碼分析

signal()源碼如下

?
1
2
3
4
5
6
7
8
9
public final void signal() {
        //是否當(dāng)前線程持有鎖
      if (!isheldexclusively())
        throw new illegalmonitorstateexception();
      node first = firstwaiter;
        //通知"激活"頭節(jié)點(diǎn)線程
      if (first != null)
        dosignal(first);
    }

先調(diào)用isheldexclusively()判斷鎖是否被當(dāng)前線程持有,然后檢查等待隊(duì)列是否為空,不為空就是可以取第一個(gè)節(jié)點(diǎn)調(diào)用dosignal(first)去"激活",這里激活不是真正的激活而只是將節(jié)點(diǎn)加入到同步阻塞隊(duì)列尾部,所以上下文中帶""的激活都是這種解釋。

下面看下isheldexclusively()實(shí)現(xiàn)

?
1
2
3
protected final boolean isheldexclusively() {
     return getexclusiveownerthread() == thread.currentthread();
   }

實(shí)現(xiàn)就是比較下當(dāng)前線程和持有鎖的線程是否同一個(gè)

下面看下dosignal(first)的實(shí)現(xiàn)

?
1
2
3
4
5
6
7
8
9
10
11
private void dosignal(node first) {
      do {
        //頭指頭后移一位,如果后面的節(jié)點(diǎn)為空,則將尾指頭也指向空,說明隊(duì)列為空了
        if ( (firstwaiter = first.nextwaiter) == null)
          lastwaiter = null;
        //清空頭節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)
        first.nextwaiter = null;
        //如果"激活"失敗者取下個(gè)繼續(xù),直到成功或者遍歷完
      } while (!transferforsignal(first) &&
           (first = firstwaiter) != null);
    }

 此方法就是取當(dāng)前頭節(jié)點(diǎn)一直去嘗試"激活",直到成功或者遍歷完。

下面來看下transferforsignal(first)方法

?
1
2
3
4
5
6
7
8
9
10
11
12
final boolean transferforsignal(node node) {
        //將condition狀態(tài)設(shè)置成0
    if (!compareandsetwaitstatus(node, node.condition, 0))
      return false;
        //加入到同步阻塞隊(duì)列
    node p = enq(node);
    int ws = p.waitstatus;
        //狀態(tài)異常直接激活
    if (ws > 0 || !compareandsetwaitstatus(p, ws, node.signal))
      locksupport.unpark(node.thread);
    return true;
  }

(1)此方法先先將condition狀態(tài)設(shè)置成0,因?yàn)槿绻莄ondition狀態(tài)加入到同步阻塞隊(duì)列,激活的時(shí)候是不識(shí)別的。
(2)加入到同步阻塞隊(duì)列的尾部。所以同步阻塞隊(duì)列中前面如果有多個(gè)在排隊(duì),調(diào)用unlock()不會(huì)馬上激活此節(jié)點(diǎn)。
(3)狀態(tài)異常直接調(diào)用unpark激活,這邊按理說如果狀態(tài)異常情況下激活,await()在調(diào)用unlock()被激活后會(huì)進(jìn)行相應(yīng)的異常處理,但看await()代碼沒有處理則是正常執(zhí)行。

這個(gè)方法主要就是把節(jié)點(diǎn)加入到同步阻塞隊(duì)列的,真正的激活則是調(diào)用unlock()去處理。

以上所述是小編給大家介紹的java并發(fā)編程之condition源碼分析詳解整合,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)服務(wù)器之家網(wǎng)站的支持!

原文鏈接:https://my.oschina.net/u/945573/blog/2995600

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 成人免费观看一区二区 | 公交车揉捏大乳呻吟喘娇 | 午夜一区二区免费视频 | 四虎影视在线观看永久地址 | 天天做天天爽 | 亚洲 欧美 另类 中文 在线 | 97综合| 成人资源影音先锋久久资源网 | 欧美高清3dfreexxxx性 | 人成午夜免费大片在线观看 | 韩国伊人 | 免费观看欧美一级高清 | ova催眠性指导5最新在线 | 日韩美女强理论片 | 国产综合亚洲专区在线 | 紧身牛仔裤美女被啪啪久久网 | k逼| 猫扑俩性 | 国产毛片一级aaaaa片 | 成人在线观看网站 | 四虎影视4hutv最新地址在线 | 女同69式互添在线观看免费 | 鬼畜重口高h合集长短篇 | 精品久久香蕉国产线看观看亚洲 | 日韩日韩日韩手机看片自拍 | 乌克兰少妇大胆大BBW | 禁止的爱善良的未删减版hd | 亚洲国内精品久久 | 大又大又黄又爽免费毛片 | 女人狂吮男人命根gif视频 | 国产成人cao在线 | 国产精品久久久久毛片真精品 | 日本中文字幕二区三区 | nxgx国产| chinese帅男gay野外性 | 亚洲精品一二区 | 欧美成人免费一区在线播放 | 成人伊在线影院 | 继攵催眠女乱h调教 | 精品久久亚洲 | 1313午夜精品理伦片 |