一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服務(wù)器之家 - 編程語言 - JAVA教程 - Java 高并發(fā)六:JDK并發(fā)包2詳解

Java 高并發(fā)六:JDK并發(fā)包2詳解

2020-06-14 11:36Hosee JAVA教程

本文主要介紹Java高并發(fā)這里整理了詳細(xì)資料,并講解了 1. 線程池的基本使用 2. 擴(kuò)展和增強(qiáng)線程池 3. ForkJoin的知識,有興趣的小伙伴可以參考下

1. 線程池的基本使用

1.1.為什么需要線程池

平時(shí)的業(yè)務(wù)中,如果要使用多線程,那么我們會在業(yè)務(wù)開始前創(chuàng)建線程,業(yè)務(wù)結(jié)束后,銷毀線程。但是對于業(yè)務(wù)來說,線程的創(chuàng)建和銷毀是與業(yè)務(wù)本身無關(guān)的,只關(guān)心線程所執(zhí)行的任務(wù)。因此希望把盡可能多的cpu用在執(zhí)行任務(wù)上面,而不是用在與業(yè)務(wù)無關(guān)的線程創(chuàng)建和銷毀上面。而線程池則解決了這個(gè)問題,線程池的作用就是將線程進(jìn)行復(fù)用。

1.2.JDK為我們提供了哪些支持

Java 高并發(fā)六:JDK并發(fā)包2詳解

 JDK中的相關(guān)類圖如上圖所示。

其中要提到的幾個(gè)特別的類。

Callable類和Runable類相似,但是區(qū)別在于Callable有返回值。

ThreadPoolExecutor是線程池的一個(gè)重要實(shí)現(xiàn)。

而Executors是一個(gè)工廠類。

1.3.線程池的使用

1.3.1.線程池的種類

  1. new FixedThreadPool 固定數(shù)量的線程池,線程池中的線程數(shù)量是固定的,不會改變。
  2. new SingleThreadExecutor 單一線程池,線程池中只有一個(gè)線程。
  3. new CachedThreadPool 緩存線程池,線程池中的線程數(shù)量不固定,會根據(jù)需求的大小進(jìn)行改變。
  4. new ScheduledThreadPool 計(jì)劃任務(wù)調(diào)度的線程池,用于執(zhí)行計(jì)劃任務(wù),比如每隔5分鐘怎么樣,
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>());
}
 
 
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>()));
}
 
 
public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     60L, TimeUnit.SECONDS,
     new SynchronousQueue<Runnable>());
}

從方法上來看,顯然 FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是ThreadPoolExecutor的不同實(shí)例,只是參數(shù)不同。

?
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  Executors.defaultThreadFactory(), defaultHandler);
}

下面來簡述下 ThreadPoolExecutor構(gòu)造函數(shù)中參數(shù)的含義。

  1. corePoolSize 線程池中核心線程數(shù)的數(shù)目
  2. maximumPoolSize 線程池中最多能容納多少個(gè)線程
  3. keepAliveTime 當(dāng)現(xiàn)在線程數(shù)目大于corePoolSize時(shí),超過keepAliveTime時(shí)間后,多出corePoolSize的那些線程將被終結(jié)。
  4. unit keepAliveTime的單位
  5. workQueue 當(dāng)任務(wù)數(shù)量很大,線程池中線程無法滿足時(shí),提交的任務(wù)會被放到阻塞隊(duì)列中,線程空閑下來則會不斷從阻塞隊(duì)列中取數(shù)據(jù)。

這樣在來看上面所說的FixedThreadPool,它的線程的核心數(shù)目和最大容納數(shù)目都是一樣的,以至于在工作期間,并不會創(chuàng)建和銷毀線程。當(dāng)任務(wù)數(shù)量很大,線程池中的線程無法滿足時(shí),任務(wù)將被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。這就意味著,任務(wù)不斷地添加,會使內(nèi)存消耗越來越大。

而CachedThreadPool則不同,它的核心線程數(shù)量是0,最大容納數(shù)目是Integer.MAX_VALUE,它的阻塞隊(duì)列是SynchronousQueue,這是一個(gè)特別的隊(duì)列,它的大小是0。由于核心線程數(shù)量是0,所以必然要將任務(wù)添加到SynchronousQueue中,這個(gè)隊(duì)列只有一個(gè)線程在從中添加數(shù)據(jù),同時(shí)另一個(gè)線程在從中獲取數(shù)據(jù)時(shí),才能成功。獨(dú)自往這個(gè)隊(duì)列中添加數(shù)據(jù)會返回失敗。當(dāng)返回失敗時(shí),則線程池開始擴(kuò)展線程,這就是為什么CachedThreadPool的線程數(shù)目是不固定的。當(dāng)60s該線程仍未被使用時(shí),線程則被銷毀。

1.4.線程池使用的小例子

1.4.1.簡單線程池

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ThreadPoolDemo {
 public static class MyTask implements Runnable {
 @Override
 public void run() {
 System.out.println(System.currentTimeMillis() + "Thread ID:"
 + Thread.currentThread().getId());
 try {
 Thread.sleep(1000);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
 
 public static void main(String[] args) {
 MyTask myTask = new MyTask();
 ExecutorService es = Executors.newFixedThreadPool(5);
 for (int i = 0; i < 10; i++) {
 es.submit(myTask);
 }
 }
}

由于使用的newFixedThreadPool(5),但是啟動了10個(gè)線程,所以每次執(zhí)行5個(gè),并且 可以很明顯的看到線程的復(fù)用,ThreadId是重復(fù)的,也就是前5個(gè)任務(wù)和后5個(gè)任務(wù)都是同一批線程去執(zhí)行的。

這里用的是

es.submit(myTask);

還有一種提交方式:

es.execute(myTask);

區(qū)別在于submit會返回一個(gè)Future對象,這個(gè)將在以后介紹。

1.4.2.ScheduledThreadPool

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
 
public class ThreadPoolDemo {
 public static void main(String[] args) {
 ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
 //如果前面的任務(wù)還未完成,則調(diào)度不會啟動。
 ses.scheduleWithFixedDelay(new Runnable() {
 
 @Override
 public void run() {
 try {
 Thread.sleep(1000);
 System.out.println(System.currentTimeMillis()/1000);
 } catch (Exception e) {
 // TODO: handle exception
 }
 
 }
 }, 0, 2, TimeUnit.SECONDS);//啟動0秒后執(zhí)行,然后周期2秒執(zhí)行一次
 }
}

輸出:

1454832514
1454832517
1454832520
1454832523
1454832526
...

由于任務(wù)執(zhí)行需要1秒,任務(wù)調(diào)度必須等待前一個(gè)任務(wù)完成。也就是這里的每隔2秒的意思是,前一個(gè)任務(wù)完成后2秒再開啟新的一個(gè)任務(wù)。

2. 擴(kuò)展和增強(qiáng)線程池

2.1.回調(diào)接口

線程池中有一些回調(diào)的api來給我們提供擴(kuò)展的操作。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
 new LinkedBlockingQueue<Runnable>()){
 
 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 System.out.println("準(zhǔn)備執(zhí)行");
 }
 
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 System.out.println("執(zhí)行完成");
 }
 
 @Override
 protected void terminated() {
 System.out.println("線程池退出");
 }
 
 };

我們可以通過實(shí)現(xiàn)ThreadPoolExecutor的子類去覆蓋ThreadPoolExecutor的beforeExecute,afterExecute,terminated方法來實(shí)現(xiàn)在線程執(zhí)行前后,線程池退出時(shí)的日志管理或其他操作。

2.2.拒絕策略

有時(shí)候,任務(wù)非常繁重,導(dǎo)致系統(tǒng)負(fù)載太大。在上面說過,當(dāng)任務(wù)量越來越大時(shí),任務(wù)都將放到FixedThreadPool的阻塞隊(duì)列中,導(dǎo)致內(nèi)存消耗太大,最終導(dǎo)致內(nèi)存溢出。這樣的情況是應(yīng)該要避免的。因此當(dāng)我們發(fā)現(xiàn)線程數(shù)量要超過最大線程數(shù)量時(shí),我們應(yīng)該放棄一些任務(wù)。丟棄時(shí),我們應(yīng)該把任務(wù)記下來,而不是直接丟掉。

ThreadPoolExecutor中還有另一個(gè)構(gòu)造函數(shù)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
 if (corePoolSize < 0 ||
  maximumPoolSize <= 0 ||
  maximumPoolSize < corePoolSize ||
  keepAliveTime < 0)
  throw new IllegalArgumentException();
 if (workQueue == null || threadFactory == null || handler == null)
  throw new NullPointerException();
 this.corePoolSize = corePoolSize;
 this.maximumPoolSize = maximumPoolSize;
 this.workQueue = workQueue;
 this.keepAliveTime = unit.toNanos(keepAliveTime);
 this.threadFactory = threadFactory;
 this.handler = handler;
 }

threadFactory我們在后面再介紹。
而handler就是拒絕策略的實(shí)現(xiàn),它會告訴我們,如果任務(wù)不能執(zhí)行了,該怎么做。

Java 高并發(fā)六:JDK并發(fā)包2詳解

共有以上4種策略。

AbortPolicy:如果不能接受任務(wù)了,則拋出異常。

CallerRunsPolicy:如果不能接受任務(wù)了,則讓調(diào)用的線程去完成。

DiscardOldestPolicy:如果不能接受任務(wù)了,則丟棄最老的一個(gè)任務(wù),由一個(gè)隊(duì)列來維護(hù)。

DiscardPolicy:如果不能接受任務(wù)了,則丟棄任務(wù)。

?
1
2
3
4
5
6
7
8
9
10
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
 new LinkedBlockingQueue<Runnable>(),
 new RejectedExecutionHandler() {
 
 @Override
 public void rejectedExecution(Runnable r,
 ThreadPoolExecutor executor) {
 System.out.println(r.toString() + "is discard");
 }
 });

當(dāng)然我們也可以自己實(shí)現(xiàn)RejectedExecutionHandler接口來自己定義拒絕策略。

2.3.自定義ThreadFactory

剛剛已經(jīng)看到了,在ThreadPoolExecutor的構(gòu)造函數(shù)中可以指定threadFactory。

線程池中的線程都是由線程工廠創(chuàng)建出來,我們可以自定義線程工廠。

默認(rèn)的線程工廠:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
 private static final AtomicInteger poolNumber = new AtomicInteger(1);
 private final ThreadGroup group;
 private final AtomicInteger threadNumber = new AtomicInteger(1);
 private final String namePrefix;
 
 DefaultThreadFactory() {
  SecurityManager s = System.getSecurityManager();
  group = (s != null) ? s.getThreadGroup() :
     Thread.currentThread().getThreadGroup();
  namePrefix = "pool-" +
    poolNumber.getAndIncrement() +
    "-thread-";
 }
 
 public Thread newThread(Runnable r) {
  Thread t = new Thread(group, r,
     namePrefix + threadNumber.getAndIncrement(),
     0);
  if (t.isDaemon())
  t.setDaemon(false);
  if (t.getPriority() != Thread.NORM_PRIORITY)
  t.setPriority(Thread.NORM_PRIORITY);
  return t;
 }
 }

3. ForkJoin

3.1.思想

Java 高并發(fā)六:JDK并發(fā)包2詳解

就是分而治之的思想。

fork/join類似MapReduce算法,兩者區(qū)別是:Fork/Join 只有在必要時(shí)如任務(wù)非常大的情況下才分割成一個(gè)個(gè)小任務(wù),而 MapReduce總是在開始執(zhí)行第一步進(jìn)行分割。看來,F(xiàn)ork/Join更適合一個(gè)JVM內(nèi)線程級別,而MapReduce適合分布式系統(tǒng)。

4.2.使用接口

RecursiveAction:無返回值
RecursiveTask:有返回值

4.3.簡單例子

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
 
 
public class CountTask extends RecursiveTask<Long>{
 
 private static final int THRESHOLD = 10000;
 private long start;
 private long end;
 
 public CountTask(long start, long end) {
 super();
 this.start = start;
 this.end = end;
 }
 
 @Override
 protected Long compute() {
 long sum = 0;
 boolean canCompute = (end - start) < THRESHOLD;
 if(canCompute)
 {
 for (long i = start; i <= end; i++) {
 sum = sum + i;
 }
 }else
 {
 //分成100個(gè)小任務(wù)
 long step = (start + end)/100;
 ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
 long pos = start;
 for (int i = 0; i < 100; i++) {
 long lastOne = pos + step;
 if(lastOne > end )
 {
 lastOne = end;
 }
 CountTask subTask = new CountTask(pos, lastOne);
 pos += step + 1;
 subTasks.add(subTask);
 subTask.fork();//把子任務(wù)推向線程池
 }
 for (CountTask t : subTasks) {
 sum += t.join();//等待所有子任務(wù)結(jié)束
 }
 }
 return sum;
 }
 
 public static void main(String[] args) {
 ForkJoinPool forkJoinPool = new ForkJoinPool();
 CountTask task = new CountTask(0, 200000L);
 ForkJoinTask<Long> result = forkJoinPool.submit(task);
 try {
 long res = result.get();
 System.out.println("sum = " + res);
 } catch (Exception e) {
 // TODO: handle exception
 e.printStackTrace();
 }
 }
 
}

上述例子描述了一個(gè)累加和的任務(wù)。將累加任務(wù)分成100個(gè)任務(wù),每個(gè)任務(wù)只執(zhí)行一段數(shù)字的累加和,最后join后,把每個(gè)任務(wù)計(jì)算出的和再累加起來。

4.4.實(shí)現(xiàn)要素

4.4.1.WorkQueue與ctl

每一個(gè)線程都會有一個(gè)工作隊(duì)列

static final class WorkQueue

在工作隊(duì)列中,會有一系列對線程進(jìn)行管理的字段

volatile int eventCount;   // encoded inactivation count; < 0 if inactive
        int nextWait;              // encoded record of next event waiter
        int nsteals;               // number of steals
        int hint;                  // steal index hint
        short poolIndex;           // index of this queue in pool
        final short mode;          // 0: lifo, > 0: fifo, < 0: shared
        volatile int qlock;        // 1: locked, -1: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        ForkJoinTask<?> currentSteal; // current non-local task being executed

這里要注意的是,JDK7和JDK8在ForkJoin的實(shí)現(xiàn)上有了很大的差別。我們這里介紹的是JDK8中的。 在線程池中,有時(shí)不是所有的線程都在執(zhí)行的,部分線程會被掛起,那些掛起的線程會被存放到一個(gè)棧中。內(nèi)部通過一個(gè)鏈表表示。
nextWait會指向下一個(gè)等待的線程。

poolIndex線程在線程池中的下標(biāo)索引。

eventCount 在初始化時(shí),eventCount與poolIndex有關(guān)。總共32位,第一位表示是否被激活,15位表示被掛起的次數(shù)

eventCount,剩下的表示poolIndex。用一個(gè)字段來表示多個(gè)意思。

工作隊(duì)列WorkQueue用ForkJoinTask<?>[] array來表示。而top,base來表示隊(duì)列的兩端,數(shù)據(jù)在這兩者之間。

在ForkJoinPool中維護(hù)著ctl(64位long型)

volatile long ctl;

* Field ctl is a long packed with:
     * AC: Number of active running workers minus target parallelism (16 bits)
     * TC: Number of total workers minus target parallelism (16 bits)
     * ST: true if pool is terminating (1 bit)
     * EC: the wait count of top waiting thread (15 bits)
     * ID: poolIndex of top of Treiber stack of waiters (16 bits)

AC表示活躍的線程數(shù)減去并行度(大概就是CPU個(gè)數(shù))

TC表示總的線程數(shù)減去并行度

ST表示線程池本身是否是激活的

EC表示頂端等待線程的掛起數(shù)

ID表示頂端等待線程的poolIndex

很明顯ST+EC+ID就是我們剛剛所說的 eventCount 。

那么為什么明明5個(gè)變量,非要合成一個(gè)變量呢。其實(shí)用5個(gè)變量占用容量也差不多。

用一個(gè)變量代碼的可讀性上會差很多。

那么為什么用一個(gè)變量呢?其實(shí)這點(diǎn)才是最巧妙的地方,因?yàn)檫@5個(gè)變量是一個(gè)整體,在多線程中,如果用5個(gè)變量,那么當(dāng)修改其中一個(gè)變量時(shí),如何保證5個(gè)變量的整體性。那么用一個(gè)變量則就解決了這個(gè)問題。如果用鎖解決,則會降低性能。

用一個(gè)變量則保證了數(shù)據(jù)的一致性和原子性。

在ForkJoin中隊(duì)ctl的更改都是使用CAS操作,在前面系列的文章中已經(jīng)介紹過,CAS是無鎖的操作,性能很好。

由于CAS操作也只能針對一個(gè)變量,所以這種設(shè)計(jì)是最優(yōu)的。

4.4.2.工作竊取

接下來要介紹下整個(gè)線程池的工作流程。

每個(gè)線程都會調(diào)用runWorker

?
1
2
3
4
5
6
final void runWorker(WorkQueue w) {
 w.growArray(); // allocate queue
 for (int r = w.hint; scan(w, r) == 0; ) {
  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
 }
 }

scan()函數(shù)是掃描是否有任務(wù)要做。

r是一個(gè)相對隨機(jī)的數(shù)字。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private final int scan(WorkQueue w, int r) {
 WorkQueue[] ws; int m;
 long c = ctl;    // for consistency check
 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
  for (int j = m + m + 1, ec = w.eventCount;;) {
  WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
  if ((q = ws[(r - j) & m]) != null &&
   (b = q.base) - q.top < 0 && (a = q.array) != null) {
   long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
   if ((t = ((ForkJoinTask<?>)
    U.getObjectVolatile(a, i))) != null) {
   if (ec < 0)
    helpRelease(c, ws, w, q, b);
   else if (q.base == b &&
     U.compareAndSwapObject(a, i, t, null)) {
    U.putOrderedInt(q, QBASE, b + 1);
    if ((b + 1) - q.top < 0)
    signalWork(ws, q);
    w.runTask(t);
   }
   }
   break;
  }
  else if (--j < 0) {
   if ((ec | (e = (int)c)) < 0) // inactive or terminating
   return awaitWork(w, c, ec);
   else if (ctl == c) {  // try to inactivate and enqueue
   long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   w.nextWait = e;
   w.eventCount = ec | INT_SIGN;
   if (!U.compareAndSwapLong(this, CTL, c, nc))
    w.eventCount = ec; // back out
   }
   break;
  }
  }
 }
 return 0;
 }

我們接下來看看scan方法,scan的一個(gè)參數(shù)是WorkQueue,上面已經(jīng)說過,每個(gè)線程都會擁有一個(gè)WorkQueue,那么多個(gè)線程的WorkQueue就會保存在workQueues里面,r是一個(gè)隨機(jī)數(shù),通過r來找到某一個(gè) WorkQueue,在WorkQueue里面有所要做的任務(wù)。

然后通過WorkQueue的base,取得base的偏移量。


b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..

然后通過偏移量得到最后一個(gè)的任務(wù),運(yùn)行這個(gè)任務(wù)

t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..

通過這個(gè)大概的分析理解了過程,我們發(fā)現(xiàn),當(dāng)前線程調(diào)用scan方法后,不會執(zhí)行當(dāng)前的WorkQueue中的任務(wù),而是通過一個(gè)隨機(jī)數(shù)r,來得到其他 WorkQueue的任務(wù)。這就是ForkJoinPool的主要的一個(gè)機(jī)理。
當(dāng)前線程不會只著眼于自己的任務(wù),而是優(yōu)先完成其他任務(wù)。這樣做來,防止了饑餓現(xiàn)象的發(fā)生。這樣就預(yù)防了某些線程因?yàn)榭ㄋ阑蛘咂渌蚨鵁o法及時(shí)完成任務(wù),或者某個(gè)線程的任務(wù)量很大,其他線程卻沒事可做。

然后來看看runTask方法

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final void runTask(ForkJoinTask<?> task) {
  if ((currentSteal = task) != null) {
  ForkJoinWorkerThread thread;
  task.doExec();
  ForkJoinTask<?>[] a = array;
  int md = mode;
  ++nsteals;
  currentSteal = null;
  if (md != 0)
   pollAndExecAll();
  else if (a != null) {
   int s, m = a.length - 1;
   ForkJoinTask<?> t;
   while ((s = top - 1) - base >= 0 &&
    (t = (ForkJoinTask<?>)U.getAndSetObject
    (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
   top = s;
   t.doExec();
   }
  }
  if ((thread = owner) != null) // no need to do in finally clause
   thread.afterTopLevelExec();
  }
 }

有一個(gè)有趣的命名:currentSteal,偷得的任務(wù),的確是剛剛解釋的那樣。

task.doExec();

將會完成這個(gè)任務(wù)。

完成了別人的任務(wù)以后,將會完成自己的任務(wù)。

通過得到top來獲得自己任務(wù)第一個(gè)任務(wù)

?
1
2
3
4
5
while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null)
{
 top = s;
 t.doExec();
}

接下來,通過一個(gè)圖來總結(jié)下剛剛線程池的流程

Java 高并發(fā)六:JDK并發(fā)包2詳解

比如有T1,T2兩個(gè)線程,T1會通過T2的base來獲得T2的最后一個(gè)任務(wù)(當(dāng)然實(shí)際上是通過一個(gè)隨機(jī)數(shù)r來取得某個(gè)線程最后一個(gè)任務(wù)),T1也會通過自己的top來執(zhí)行自己的第一個(gè)任務(wù)。反之,T2也會如此。
拿其他線程的任務(wù)都是從base開始拿的,自己拿自己的任務(wù)是從top開始拿的。這樣可以減少沖突

如果沒有找到其他任務(wù)

?
1
2
3
4
5
6
7
8
9
10
11
12
else if (--j < 0) {
   if ((ec | (e = (int)c)) < 0) // inactive or terminating
   return awaitWork(w, c, ec);
   else if (ctl == c) {  // try to inactivate and enqueue
   long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   w.nextWait = e;
   w.eventCount = ec | INT_SIGN;
   if (!U.compareAndSwapLong(this, CTL, c, nc))
    w.eventCount = ec; // back out
   }
   break;
  }

那么首先會通過一系列運(yùn)行來改變ctl的值,獲得了nc,然后用CAS將新的值賦值。然后就調(diào)用awaitWork()將線程進(jìn)入等待狀態(tài)(調(diào)用的 前面系列文章中提到的unsafe的park方法)。
這里要說明的是改變ctl值這里,首先是將ctl中的AC-1,AC是占ctl的前16位,所以不能直接-1,而是通過AC_UNIT(0x1000000000000)來達(dá)到使ctl的前16位-1的效果。

前面說過eventCount中有保存poolIndex,通過poolIndex以及WorkQueue中的nextWait,就能遍歷所有的等待線程。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲AV无码国产精品色在线看 | 操儿子 | gayrb免费漫画入口 | 无码区国产区在线播放 | 亚洲爱v | 国产精品一区二区在线观看完整版 | 99国产精品热久久久久久夜夜嗨 | 韩国悲惨事件30无删减在线 | 99热久久这里只有精品23 | 成人1234 | 亚洲视频在线一区二区三区 | 美女张开腿让我了一夜 | 国产综合色在线视频区色吧图片 | 男人天堂资源网 | 美式禁忌在线 | 99热久久这里只有精品6国产网 | 日日操天天射 | 免费在线观看a | 涩涩屋在线播放 | 日本免费观看95视频网站 | 九九免费精品视频 | 成人性生交小说免费看 | 草馏社区最新1024 | 国产一卡二卡3卡4卡四卡在线视频 | 欧美综合另类 | 亚洲成人网导航 | 美女被免费视频 | 青青91 | 亚洲AV福利天堂一区二区三 | jzz大全部 | 天堂中文在线免费观看 | 青青青国产成人久久111网站 | 香蕉成人999视频 | 精品一区二区三区视频日产 | 色花堂中文字幕98堂网址 | 四影虎库最新2021 | 亚洲 日韩 在线 国产 视频 | 精品国产一区二区 | bt国产| 日韩欧美推理片免费看完整版 | 性满足久久久久久久久 |