日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析

發布時間:2024/1/17 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ThreadPoolExecutor是Executor執行框架最重要的一個實現類,提供了線程池管理和任務管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的源碼來看看如何設計和實現一個基于生產者消費者模型的執行器。

?

生產者消費者模型

生產者消費者模型包含三個角色:生產者,工作隊列,消費者。對于ThreadPoolExecutor來說,

1. 生產者是任務的提交者,是外部調用ThreadPoolExecutor的線程

2. 工作隊列是一個阻塞隊列的接口,具體的實現類可以有很多種。BlockingQueue<Runnable> workQueue;

3. 消費者是封裝了線程的Worker類的集合。HashSet<Worker> workers = new HashSet<Worker>();

?

?

主要屬性

明確了ThreadPoolExecutor的基本執行模型之后,來看下它的幾個主要屬性:

1.?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));??? 一個32位的原子整形作為線程池的狀態控制描述符。低29位作為工作者線程的數量。所以工作者線程最多有2^29 -1個。高3位來保持線程池的狀態。ThreadPoolExecutor總共有5種狀態:

???? *?? RUNNING:? 可以接受新任務并執行
???? *?? SHUTDOWN: 不再接受新任務,但是仍然執行工作隊列中的任務
???? *?? STOP:???? 不再接受新任務,不執行工作隊列中的任務,并且中斷正在執行的任務
???? *?? TIDYING:? 所有任務被終止,工作線程的數量為0,會去執行terminated()鉤子方法
???? *?? TERMINATED: terminated()執行結束

?

下面是一系列ctl這個變量定義和工具方法

?

?
  • private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  • private static final int COUNT_BITS = Integer.SIZE - 3;

  • private static final int CAPACITY = (1 << COUNT_BITS) - 1;

  • ?
  • // runState is stored in the high-order bits

  • private static final int RUNNING = -1 << COUNT_BITS;

  • private static final int SHUTDOWN = 0 << COUNT_BITS;

  • private static final int STOP = 1 << COUNT_BITS;

  • private static final int TIDYING = 2 << COUNT_BITS;

  • private static final int TERMINATED = 3 << COUNT_BITS;

  • ?
  • // Packing and unpacking ctl

  • private static int runStateOf(int c) { return c & ~CAPACITY; }

  • private static int workerCountOf(int c) { return c & CAPACITY; }

  • private static int ctlOf(int rs, int wc) { return rs | wc; }

  • ?
  • private static boolean runStateLessThan(int c, int s) {

  • return c < s;

  • }

  • ?
  • private static boolean runStateAtLeast(int c, int s) {

  • return c >= s;

  • }

  • ?
  • private static boolean isRunning(int c) {

  • return c < SHUTDOWN;

  • }

  • ?
  • private boolean compareAndIncrementWorkerCount(int expect) {

  • return ctl.compareAndSet(expect, expect + 1);

  • }

  • ?
  • private boolean compareAndDecrementWorkerCount(int expect) {

  • return ctl.compareAndSet(expect, expect - 1);

  • }

  • ?
  • private void decrementWorkerCount() {

  • do {} while (! compareAndDecrementWorkerCount(ctl.get()));

  • }


  • 2. private final BlockingQueue<Runnable> workQueue; 工作隊列,采用了BlockingQueue阻塞隊列的接口,具體實現類可以按照不同的策略來選擇,比如有邊界的ArrayBlockingQueue,無邊界的LinkedBlockingQueue。

    ?

    3. private final ReentrantLock mainLock = new ReentrantLock();? 控制ThreadPoolExecutor的全局可重入鎖,所有需要同步的操作都要被這個鎖保護

    4. private final Condition termination = mainLock.newCondition(); mainLock的條件隊列,來進行wait()和notify()等條件操作

    5. private final HashSet<Worker> workers = new HashSet<Worker>();? 工作線程集合

    6. private volatile ThreadFactory threadFactory; 創建線程的工廠,可以自定義線程創建的邏輯

    7. private volatile RejectedExecutionHandler handler;? 拒絕執行任務的處理器,可以自定義拒絕的策略

    8. private volatile long keepAliveTime;?? 空閑線程的存活時間。可以根據這個存活時間來判斷空閑線程是否等待超時,然后采取相應的線程回收操作

    9. private volatile boolean allowCoreThreadTimeOut;? 是否允許coreThread線程超時回收

    10. private volatile int corePoolSize;? 可存活的線程的最小值。如果設置了allowCoreThreadTimeOut, 那么corePoolSize的值可以為0。

    11. private volatile int maximumPoolSize;? 可存活的線程的最大值

    ?

    工作線程創建和回收策略

    ThreadPoolExecutor通過corePoolSize,maximumPoolSize, allowCoreThreadTimeOut,keepAliveTime等幾個參數提供一個靈活的工作線程創建和回收的策略。

    創建策略:

    1. 當工作線程數量小于corePoolSize時,不管其他線程是否空閑,都創建新的工作線程來處理新加入的任務

    2. 當工作線程數量大于corePoolSize,小于maximumPoolSize時,只有當工作隊列滿了,才會創建新的工作線程來處理新加入的任務。當工作隊列有空余時,只把新任務加入隊列

    3. 把corePoolSize和maximumPoolSize 設置成相同的值時,線程池就是一個固定(fixed)工作線程數的線程。

    回收策略:

    1. keepAliveTime變量設置了空閑工作線程超時的時間,當工作線程數量超過了corePoolSize后,空閑的工作線程等待超過了keepAliveTime后,會被回收。后面會說怎么確定一個工作線程是否“空閑”。

    2. 如果設置了allowCoreThreadTimeOut,那么core Thread也可以被回收,即當core thread也空閑時,也可以被回收,直到工作線程集合為0。

    工作隊列策略

    ?

    工作隊列BlockingQueue<Runnable> workQueue 是用來存放提交的任務的。它有4個基本的策略,并且根據不同的阻塞隊列的實現類可以引入更多的工作隊列的策略。

    4個基本策略:

    1. 當工作線程數量小于corePoolSize時,新提交的任務總是會由新創建的工作線程執行,不入隊列

    2. 當工作線程數量大于corePoolSize,如果工作隊列沒滿,新提交的任務就入隊列

    3. 當工作線程數量大于corePoolSize,小于MaximumPoolSize時,如果工作隊列滿了,新提交的任務就交給新創建的工作線程,不入隊列

    4. 當工作線程數量大于MaximumPoolSize,并且工作隊列滿了,那么新提交的任務會被拒絕執行。具體看采用何種拒絕策略

    根據不同的阻塞隊列的實現類,又有幾種額外的策略

    1. 采用SynchronousQueue直接將任務傳遞給空閑的線程執行,不額外存儲任務。這種方式需要無限制的MaximumPoolSize,可以創建無限制的工作線程來處理提交的任務。這種方式的好處是任務可以很快被執行,適用于任務到達時間大于任務處理時間的情況。缺點是當任務量很大時,會占用大量線程

    2. 采用無邊界的工作隊列LinkedBlockingQueue。這種情況下,由于工作隊列永遠不會滿,那么工作線程的數量最大就是corePoolSize,因為當工作線程數量達到corePoolSize時,只有工作隊列滿的時候才會創建新的工作線程。這種方式好處是使用的線程數量是穩定的,當內存足夠大時,可以處理足夠多的請求。缺點是如果任務直接有依賴,很有可能形成死鎖,因為當工作線程被消耗完時,不會創建新的工作現場,只會把任務加入工作隊列。并且可能由于內存耗盡引發內存溢出OOM

    3. 采用有界的工作隊列AraayBlockingQueue。這種情況下對于內存資源是可控的,但是需要合理調節MaximumPoolSize和工作隊列的長度,這兩個值是相互影響的。當工作隊列長度比較小的時,必定會創建更多的線程。而更多的線程會引起上下文切換等額外的消耗。當工作隊列大,MaximumPoolSize小的時候,會影響吞吐量,并且會觸發拒絕機制。

    ?

    拒絕執行策略

    當Executor處于shutdown狀態或者工作線程超過MaximumPoolSize并且工作隊列滿了之后,新提交的任務將會被拒絕執行。RejectedExecutionHandler接口定義了拒絕執行的策略。具體的策略有

    CallerRunsPolicy:由調用者線程來執行被拒絕的任務,屬于同步執行

    AbortPolicy:中止執行,拋出RejectedExecutionException異常

    DiscardPolicy:丟棄任務

    DiscardOldestPolicy:丟棄最老的任務

    ?

    ?
  • public static class CallerRunsPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code CallerRunsPolicy}.

  • */

  • public CallerRunsPolicy() { }

  • ?
  • /**

  • * Executes task r in the caller's thread, unless the executor

  • * has been shut down, in which case the task is discarded.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • if (!e.isShutdown()) {

  • r.run();

  • }

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that throws a

  • * {@code RejectedExecutionException}.

  • */

  • public static class AbortPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates an {@code AbortPolicy}.

  • */

  • public AbortPolicy() { }

  • ?
  • /**

  • * Always throws RejectedExecutionException.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • * @throws RejectedExecutionException always.

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • throw new RejectedExecutionException("Task " + r.toString() +

  • " rejected from " +

  • e.toString());

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that silently discards the

  • * rejected task.

  • */

  • public static class DiscardPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code DiscardPolicy}.

  • */

  • public DiscardPolicy() { }

  • ?
  • /**

  • * Does nothing, which has the effect of discarding task r.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that discards the oldest unhandled

  • * request and then retries {@code execute}, unless the executor

  • * is shut down, in which case the task is discarded.

  • */

  • public static class DiscardOldestPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code DiscardOldestPolicy} for the given executor.

  • */

  • public DiscardOldestPolicy() { }

  • ?
  • /**

  • * Obtains and ignores the next task that the executor

  • * would otherwise execute, if one is immediately available,

  • * and then retries execution of task r, unless the executor

  • * is shut down, in which case task r is instead discarded.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • if (!e.isShutdown()) {

  • e.getQueue().poll();

  • e.execute(r);

  • }

  • }

  • }

  • ?

    工作線程Worker的設計

    工作線程沒有直接使用Thread,而是采用了Worker類封裝了Thread,目的是更好地進行中斷控制。Worker直接繼承了AbstractQueuedSynchronizer來進行同步操作,它實現了一個不可重入的互斥結構。當它的state屬性為0時表示unlock,state為1時表示lock。任務執行時必須在lock狀態的保護下,防止出現同步問題。因此當Worker處于lock狀態時,表示它正在運行,當它處于unlock狀態時,表示它“空閑”。當它空閑超過keepAliveTime時,就有可能被回收。

    Worker還實現了Runnable接口, 執行它的線程是Worker包含的Thread對象,在Worker的構造函數可以看到Thread創建時,把Worker對象傳遞給了它。

    ?

    ?
  • private final class Worker

  • extends AbstractQueuedSynchronizer

  • implements Runnable

  • {

  • ?
  • /** Thread this worker is running in. Null if factory fails. */

  • final Thread thread;

  • /** Initial task to run. Possibly null. */

  • Runnable firstTask;

  • /** Per-thread task counter */

  • volatile long completedTasks;

  • ?
  • Worker(Runnable firstTask) {

  • setState(-1); // inhibit interrupts until runWorker

  • this.firstTask = firstTask;

  • ?
  • // 把Worker對象作為Runnable的實例傳遞給了新創建Thread對象

  • ?this.thread = getThreadFactory().newThread(this);

  • }

  • ?
  • public void run() {

  • runWorker(this);

  • }

  • ?
  • // Lock methods

  • //

  • // The value 0 represents the unlocked state.

  • // The value 1 represents the locked state.

  • ?
  • protected boolean isHeldExclusively() {

  • return getState() != 0;

  • }

  • ?
  • protected boolean tryAcquire(int unused) {

  • if (compareAndSetState(0, 1)) {

  • setExclusiveOwnerThread(Thread.currentThread());

  • return true;

  • }

  • return false;

  • }

  • ?
  • protected boolean tryRelease(int unused) {

  • setExclusiveOwnerThread(null);

  • setState(0);

  • return true;

  • }

  • ?
  • public void lock() { acquire(1); }

  • public boolean tryLock() { return tryAcquire(1); }

  • public void unlock() { release(1); }

  • public boolean isLocked() { return isHeldExclusively(); }

  • ?
  • void interruptIfStarted() {

  • Thread t;

  • if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

  • try {

  • t.interrupt();

  • } catch (SecurityException ignore) {

  • }

  • }

  • }

  • }


  • Worker被它的線程執行時,run方法調用了ThreadPoolExecutor的runWorker方法。

    1. wt指向當前執行Worker的run方法的線程,也就是指向了Worker包含的工作線程對象

    2. task指向Worker包含的firstTask對象,表示當前要執行的任務

    3. 當task不為null或者從工作隊列中取到了新任務,那么先加鎖w.lock表示正在運行任務。在真正開始執行task.run()之前,先判斷線程池的狀態是否已經STOP,如果是,就中斷Worker的線程。

    4. 一旦判斷當前線程不是STOP并且工作線程沒有中斷。那么就開始執行task.run()了。Worker的interruptIfStarted方法可以中斷這個Worker的線程,從而中斷正在執行任務。

    5.?beforeExecute(wt, task)和afterExecute(wt,task)是兩個鉤子方法,支持在任務真正開始執行前就行擴展。

    ?

    ?
  • final void runWorker(Worker w) {

  • Thread wt = Thread.currentThread();

  • Runnable task = w.firstTask;

  • w.firstTask = null;

  • w.unlock(); // allow interrupts

  • boolean completedAbruptly = true;

  • try {

  • while (task != null || (task = getTask()) != null) {

  • w.lock();

  • // If pool is stopping, ensure thread is interrupted;

  • // if not, ensure thread is not interrupted. This

  • // requires a recheck in second case to deal with

  • // shutdownNow race while clearing interrupt

  • if ((runStateAtLeast(ctl.get(), STOP) ||

  • (Thread.interrupted() &&

  • runStateAtLeast(ctl.get(), STOP))) &&

  • !wt.isInterrupted())

  • wt.interrupt();

  • try {

  • beforeExecute(wt, task);

  • Throwable thrown = null;

  • try {

  • task.run();

  • } catch (RuntimeException x) {

  • thrown = x; throw x;

  • } catch (Error x) {

  • thrown = x; throw x;

  • } catch (Throwable x) {

  • thrown = x; throw new Error(x);

  • } finally {

  • afterExecute(task, thrown);

  • }

  • } finally {

  • task = null;

  • w.completedTasks++;

  • w.unlock();

  • }

  • }

  • completedAbruptly = false;

  • } finally {

  • processWorkerExit(w, completedAbruptly);

  • }

  • }


  • 工作線程Worker創建和回收的源碼

    首先看一下ThreadPoolExecutor的execute方法,這個方式是任務提交的入口。可以看到它的邏輯符合之前說的工作線程創建的基本策略

    1. 當工作線程數量小于corePoolSize時,通過addWorker(command,true)來新建工作線程處理新建的任務,不入工作隊列

    2. 當工作線程數量大于等于corePoolSize時,先入隊列,使用的是BlockingQueue的offer方法。當工作線程數量為0時,還會通過addWorker(null, false)添加一個新的工作線程

    3. 當工作隊列滿了并且工作線程數量在corePoolSize和MaximumPoolSize之間,就創建新的工作線程去執行新添加的任務。當工作線程數量超過了MaximumPoolSize,就拒絕任務。

    ?

    ?
  • public void execute(Runnable command) {

  • if (command == null)

  • throw new NullPointerException();

  • ?
  • int c = ctl.get();

  • if (workerCountOf(c) < corePoolSize) {

  • if (addWorker(command, true))

  • return;

  • c = ctl.get();

  • }

  • if (isRunning(c) && workQueue.offer(command)) {

  • int recheck = ctl.get();

  • if (! isRunning(recheck) && remove(command))

  • reject(command);

  • else if (workerCountOf(recheck) == 0)

  • addWorker(null, false);

  • }

  • else if (!addWorker(command, false))

  • reject(command);

  • }


  • 可以看到addWorker方法是創建Worker工作線程的所在。

    1. retry這個循環判斷線程池的狀態和當前工作線程數量的邊界。如果允許創建工作現場,首先修改ctl變量表示的工作線程的數量

    2. 把工作線程添加到workers集合中的操作要在mainLock這個鎖的保護下進行。所有和ThreadPoolExecutor狀態相關的操作都要在mainLock鎖的保護下進行

    3. w = new Worker(firstTask); 創建Worker實例,把firstTask作為它當前的任務。firstTask為null時表示先只創建Worker線程,然后去工作隊列中取任務執行

    4. 把新創建的Worker實例加入到workers集合,修改相關統計變量。

    5. 當加入集合成功后,開始啟動這個Worker實例。啟動的方法是調用Worker封裝的Thread的start()方法。之前說了,這個Thread對應的Runnable是Worker本身,會去調用Worker的run方法,然后調用ThreadPoolExecutor的runWorker方法。在runWorker方法中真正去執行任務。

    ?

    ?
  • private boolean addWorker(Runnable firstTask, boolean core) {

  • retry:

  • for (;;) {

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • // Check if queue empty only if necessary.

  • if (rs >= SHUTDOWN &&

  • ! (rs == SHUTDOWN &&

  • firstTask == null &&

  • ! workQueue.isEmpty()))

  • return false;

  • ?
  • for (;;) {

  • int wc = workerCountOf(c);

  • if (wc >= CAPACITY ||

  • wc >= (core ? corePoolSize : maximumPoolSize))

  • return false;

  • if (compareAndIncrementWorkerCount(c))

  • break retry;

  • c = ctl.get(); // Re-read ctl

  • if (runStateOf(c) != rs)

  • continue retry;

  • // else CAS failed due to workerCount change; retry inner loop

  • }

  • }

  • ?
  • boolean workerStarted = false;

  • boolean workerAdded = false;

  • Worker w = null;

  • try {

  • final ReentrantLock mainLock = this.mainLock;

  • w = new Worker(firstTask);

  • final Thread t = w.thread;

  • if (t != null) {

  • mainLock.lock();

  • try {

  • // Recheck while holding lock.

  • // Back out on ThreadFactory failure or if

  • // shut down before lock acquired.

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • if (rs < SHUTDOWN ||

  • (rs == SHUTDOWN && firstTask == null)) {

  • if (t.isAlive()) // precheck that t is startable

  • throw new IllegalThreadStateException();

  • workers.add(w);

  • int s = workers.size();

  • if (s > largestPoolSize)

  • largestPoolSize = s;

  • workerAdded = true;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • if (workerAdded) {

  • t.start();

  • workerStarted = true;

  • }

  • }

  • } finally {

  • if (! workerStarted)

  • addWorkerFailed(w);

  • }

  • return workerStarted;

  • }


  • 工作線程回收的方法是processWorkerExit(),它在runWorker方法執行結束的時候被調用。之前說了空閑的工作線程可能會在keepAliveTime時間之后被回收。這個邏輯隱含在runWorker方法和getTask方法中,會在下面說如何從工作隊列取任務時說明。processWorkerExit方法單純只是處理工作線程的回收。

    1. 結合runWorker方法看,如果Worker執行task.run()的時候拋出了異常,那么completedAbruptly為true,需要從workers集合中把這個工作線程移除掉。

    2. 如果是completedAbruptly為true,并且線程池不是STOP狀態,那么就創建一個新的Worker工作線程

    3. 如果是completedAbruptly為false,并且線程池不是STOP狀態,首先檢查是否allowCoreThreadTimeout,如果運行,那么最少線程數可以為0,否則是corePoolSize。如果最少線程數為0,并且工作隊列不為空,那么最小值為1。最后檢查當前的工作線程數量,如果小于最小值,就創建新的工作線程。

    ?

    ?
  • private void processWorkerExit(Worker w, boolean completedAbruptly) {

  • if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

  • decrementWorkerCount();

  • ?
  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • completedTaskCount += w.completedTasks;

  • workers.remove(w);

  • } finally {

  • mainLock.unlock();

  • }

  • ?
  • tryTerminate();

  • ?
  • int c = ctl.get();

  • if (runStateLessThan(c, STOP)) {

  • if (!completedAbruptly) {

  • int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

  • if (min == 0 && ! workQueue.isEmpty())

  • min = 1;

  • if (workerCountOf(c) >= min)

  • return; // replacement not needed

  • }

  • addWorker(null, false);

  • }

  • }


  • 任務的獲取

    工作線程從工作隊列中取任務的代碼在getTask方法中

    1. timed變量表示是否要計時,當計時超過keepAliveTime后還沒取到任務,就返回null。結合runWorker方法可以知道,當getTask返回null時,該Worker線程會被回收,這就是如何回收空閑工作線程的方法。

    timed變量當allowCoreThreadTimeout為true或者當工作線程數大于corePoolSize時為true。

    2. 如果timed為true,就用BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法來計時從隊頭取任務,否則直接用take()方法從隊頭取任務

    ?

    ?
  • private Runnable getTask() {

  • boolean timedOut = false; // Did the last poll() time out?

  • ?
  • retry:

  • for (;;) {

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • // Check if queue empty only if necessary.

  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

  • decrementWorkerCount();

  • return null;

  • }

  • ?
  • boolean timed; // Are workers subject to culling?

  • ?
  • for (;;) {

  • int wc = workerCountOf(c);

  • timed = allowCoreThreadTimeOut || wc > corePoolSize;

  • ?
  • if (wc <= maximumPoolSize && ! (timedOut && timed))

  • break;

  • if (compareAndDecrementWorkerCount(c))

  • return null;

  • c = ctl.get(); // Re-read ctl

  • if (runStateOf(c) != rs)

  • continue retry;

  • // else CAS failed due to workerCount change; retry inner loop

  • }

  • ?
  • try {

  • Runnable r = timed ?

  • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  • workQueue.take();

  • if (r != null)

  • return r;

  • timedOut = true;

  • } catch (InterruptedException retry) {

  • timedOut = false;

  • }

  • }

  • }


  • 線程池的關閉

    線程池有SHUTDOWN, STOP, TIDYING, TERMINATED這幾個狀態和線程池關閉相關。通常我們把關閉分為優雅的關閉和強制立刻關閉。

    所謂優雅的關閉就是調用shutdown()方法,線程池進入SHUTDOWN狀態,不在接收新的任務,會把工作隊列的任務執行完畢后再結束。

    強制立刻關閉就是調用shutdownNow()方法,線程池直接進入STOP狀態,會中斷正在執行的工作線程,清空工作隊列。

    1. 在shutdown方法中,先設置線程池狀態為SHUTDOWN,然后先去中斷空閑的工作線程,再調用onShutdown鉤子方法。最后tryTerminate()

    2. 在shutdownNow方法中,先設置線程池狀態為STOP,然后先中斷所有的工作線程,再清空工作隊列。最后tryTerminate()。這個方法會把工作隊列中的任務返回給調用者處理。

    ?

    ?
  • public void shutdown() {

  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • checkShutdownAccess();

  • advanceRunState(SHUTDOWN);

  • interruptIdleWorkers();

  • onShutdown(); // hook for ScheduledThreadPoolExecutor

  • } finally {

  • mainLock.unlock();

  • }

  • tryTerminate();

  • }

  • ?
  • ? public List<Runnable> shutdownNow() {

  • ??????? List<Runnable> tasks;

  • ??????? final ReentrantLock mainLock = this.mainLock;

  • ??????? mainLock.lock();

  • ??????? try {

  • ??????????? checkShutdownAccess();

  • ??????????? advanceRunState(STOP);

  • ??????????? interruptWorkers();

  • ??????????? tasks = drainQueue();

  • ??????? } finally {

  • ??????????? mainLock.unlock();

  • ??????? }

  • ??????? tryTerminate();

  • ??????? return tasks;

  • ??? }


  • interruptIdleWorkers方法會去中斷空閑的工作線程,所謂空閑的工作線程即沒有上鎖的Worker。

    而interruptWorkers方法直接去中斷所有的Worker,調用Worker.interruptIfStarted()方法

    ?

    ?
  • private void interruptIdleWorkers(boolean onlyOne) {

  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • for (Worker w : workers) {

  • Thread t = w.thread;

  • if (!t.isInterrupted() && w.tryLock()) {

  • try {

  • t.interrupt();

  • } catch (SecurityException ignore) {

  • } finally {

  • w.unlock();

  • }

  • }

  • if (onlyOne)

  • break;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • }

  • ?
  • ?private void interruptWorkers() {

  • ??????? final ReentrantLock mainLock = this.mainLock;

  • ??????? mainLock.lock();

  • ??????? try {

  • ??????????? for (Worker w : workers)

  • ??????????????? w.interruptIfStarted();

  • ??????? } finally {

  • ??????????? mainLock.unlock();

  • ??????? }

  • ??? }

  • ?
  • ? void interruptIfStarted() {

  • ??????????? Thread t;

  • ??????????? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

  • ??????????????? try {

  • ??????????????????? t.interrupt();

  • ??????????????? } catch (SecurityException ignore) {

  • ??????????????? }

  • ??????????? }

  • ??????? }


  • tryTerminate方法會嘗試終止線程池,根據線程池的狀態,在相應狀態會中斷空閑工作線程,調用terminated()鉤子方法,設置狀態為TERMINATED。

    ?

    ?
  • final void tryTerminate() {

  • for (;;) {

  • int c = ctl.get();

  • if (isRunning(c) ||

  • runStateAtLeast(c, TIDYING) ||

  • (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

  • return;

  • if (workerCountOf(c) != 0) { // Eligible to terminate

  • interruptIdleWorkers(ONLY_ONE);

  • return;

  • }

  • ?
  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

  • try {

  • terminated();

  • } finally {

  • ctl.set(ctlOf(TERMINATED, 0));

  • termination.signalAll();

  • }

  • return;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • // else retry on failed CAS

  • }

  • }


  • 最后說明一下,JVM的守護進程只有當所有派生出來的線程都結束后才會退出,使用ThreadPoolExecutor線程池時,如果有的任務一直執行,并且不響應中斷,那么會一直占用線程,那么JVM也會一直工作,不會退出。

    總結

    以上是生活随笔為你收集整理的聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。