聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析
ThreadPoolExecutor是Executor執行框架最重要的一個實現類,提供了線程池管理和任務管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的源碼來看看如何設計和實現一個基于生產者消費者模型的執行器。
?
生產者消費者模型
生產者消費者模型包含三個角色:生產者,工作隊列,消費者。對于ThreadPoolExecutor來說,
1. 生產者是任務的提交者,是外部調用ThreadPoolExecutor的線程
2. 工作隊列是一個阻塞隊列的接口,具體的實現類可以有很多種。BlockingQueue<Runnable> workQueue;
3. 消費者是封裝了線程的Worker類的集合。HashSet<Worker> workers = new HashSet<Worker>();
?
?
主要屬性
明確了ThreadPoolExecutor的基本執行模型之后,來看下它的幾個主要屬性:
1.?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));??? 一個32位的原子整形作為線程池的狀態控制描述符。低29位作為工作者線程的數量。所以工作者線程最多有2^29 -1個。高3位來保持線程池的狀態。ThreadPoolExecutor總共有5種狀態:
???? *?? RUNNING:? 可以接受新任務并執行
???? *?? SHUTDOWN: 不再接受新任務,但是仍然執行工作隊列中的任務
???? *?? STOP:???? 不再接受新任務,不執行工作隊列中的任務,并且中斷正在執行的任務
???? *?? TIDYING:? 所有任務被終止,工作線程的數量為0,會去執行terminated()鉤子方法
???? *?? TERMINATED: terminated()執行結束
?
下面是一系列ctl這個變量定義和工具方法
?
?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2. private final BlockingQueue<Runnable> workQueue; 工作隊列,采用了BlockingQueue阻塞隊列的接口,具體實現類可以按照不同的策略來選擇,比如有邊界的ArrayBlockingQueue,無邊界的LinkedBlockingQueue。
?
3. private final ReentrantLock mainLock = new ReentrantLock();? 控制ThreadPoolExecutor的全局可重入鎖,所有需要同步的操作都要被這個鎖保護
4. private final Condition termination = mainLock.newCondition(); mainLock的條件隊列,來進行wait()和notify()等條件操作
5. private final HashSet<Worker> workers = new HashSet<Worker>();? 工作線程集合
6. private volatile ThreadFactory threadFactory; 創建線程的工廠,可以自定義線程創建的邏輯
7. private volatile RejectedExecutionHandler handler;? 拒絕執行任務的處理器,可以自定義拒絕的策略
8. private volatile long keepAliveTime;?? 空閑線程的存活時間。可以根據這個存活時間來判斷空閑線程是否等待超時,然后采取相應的線程回收操作
9. private volatile boolean allowCoreThreadTimeOut;? 是否允許coreThread線程超時回收
10. private volatile int corePoolSize;? 可存活的線程的最小值。如果設置了allowCoreThreadTimeOut, 那么corePoolSize的值可以為0。
11. private volatile int maximumPoolSize;? 可存活的線程的最大值
?
工作線程創建和回收策略
ThreadPoolExecutor通過corePoolSize,maximumPoolSize, allowCoreThreadTimeOut,keepAliveTime等幾個參數提供一個靈活的工作線程創建和回收的策略。
創建策略:
1. 當工作線程數量小于corePoolSize時,不管其他線程是否空閑,都創建新的工作線程來處理新加入的任務
2. 當工作線程數量大于corePoolSize,小于maximumPoolSize時,只有當工作隊列滿了,才會創建新的工作線程來處理新加入的任務。當工作隊列有空余時,只把新任務加入隊列
3. 把corePoolSize和maximumPoolSize 設置成相同的值時,線程池就是一個固定(fixed)工作線程數的線程。
回收策略:
1. keepAliveTime變量設置了空閑工作線程超時的時間,當工作線程數量超過了corePoolSize后,空閑的工作線程等待超過了keepAliveTime后,會被回收。后面會說怎么確定一個工作線程是否“空閑”。
2. 如果設置了allowCoreThreadTimeOut,那么core Thread也可以被回收,即當core thread也空閑時,也可以被回收,直到工作線程集合為0。
工作隊列策略
?
工作隊列BlockingQueue<Runnable> workQueue 是用來存放提交的任務的。它有4個基本的策略,并且根據不同的阻塞隊列的實現類可以引入更多的工作隊列的策略。
4個基本策略:
1. 當工作線程數量小于corePoolSize時,新提交的任務總是會由新創建的工作線程執行,不入隊列
2. 當工作線程數量大于corePoolSize,如果工作隊列沒滿,新提交的任務就入隊列
3. 當工作線程數量大于corePoolSize,小于MaximumPoolSize時,如果工作隊列滿了,新提交的任務就交給新創建的工作線程,不入隊列
4. 當工作線程數量大于MaximumPoolSize,并且工作隊列滿了,那么新提交的任務會被拒絕執行。具體看采用何種拒絕策略
根據不同的阻塞隊列的實現類,又有幾種額外的策略
1. 采用SynchronousQueue直接將任務傳遞給空閑的線程執行,不額外存儲任務。這種方式需要無限制的MaximumPoolSize,可以創建無限制的工作線程來處理提交的任務。這種方式的好處是任務可以很快被執行,適用于任務到達時間大于任務處理時間的情況。缺點是當任務量很大時,會占用大量線程
2. 采用無邊界的工作隊列LinkedBlockingQueue。這種情況下,由于工作隊列永遠不會滿,那么工作線程的數量最大就是corePoolSize,因為當工作線程數量達到corePoolSize時,只有工作隊列滿的時候才會創建新的工作線程。這種方式好處是使用的線程數量是穩定的,當內存足夠大時,可以處理足夠多的請求。缺點是如果任務直接有依賴,很有可能形成死鎖,因為當工作線程被消耗完時,不會創建新的工作現場,只會把任務加入工作隊列。并且可能由于內存耗盡引發內存溢出OOM
3. 采用有界的工作隊列AraayBlockingQueue。這種情況下對于內存資源是可控的,但是需要合理調節MaximumPoolSize和工作隊列的長度,這兩個值是相互影響的。當工作隊列長度比較小的時,必定會創建更多的線程。而更多的線程會引起上下文切換等額外的消耗。當工作隊列大,MaximumPoolSize小的時候,會影響吞吐量,并且會觸發拒絕機制。
?
拒絕執行策略
當Executor處于shutdown狀態或者工作線程超過MaximumPoolSize并且工作隊列滿了之后,新提交的任務將會被拒絕執行。RejectedExecutionHandler接口定義了拒絕執行的策略。具體的策略有
CallerRunsPolicy:由調用者線程來執行被拒絕的任務,屬于同步執行
AbortPolicy:中止執行,拋出RejectedExecutionException異常
DiscardPolicy:丟棄任務
DiscardOldestPolicy:丟棄最老的任務
?
?public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
?
工作線程Worker的設計
工作線程沒有直接使用Thread,而是采用了Worker類封裝了Thread,目的是更好地進行中斷控制。Worker直接繼承了AbstractQueuedSynchronizer來進行同步操作,它實現了一個不可重入的互斥結構。當它的state屬性為0時表示unlock,state為1時表示lock。任務執行時必須在lock狀態的保護下,防止出現同步問題。因此當Worker處于lock狀態時,表示它正在運行,當它處于unlock狀態時,表示它“空閑”。當它空閑超過keepAliveTime時,就有可能被回收。
Worker還實現了Runnable接口, 執行它的線程是Worker包含的Thread對象,在Worker的構造函數可以看到Thread創建時,把Worker對象傳遞給了它。
?
?private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 把Worker對象作為Runnable的實例傳遞給了新創建Thread對象
?this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker被它的線程執行時,run方法調用了ThreadPoolExecutor的runWorker方法。
1. wt指向當前執行Worker的run方法的線程,也就是指向了Worker包含的工作線程對象
2. task指向Worker包含的firstTask對象,表示當前要執行的任務
3. 當task不為null或者從工作隊列中取到了新任務,那么先加鎖w.lock表示正在運行任務。在真正開始執行task.run()之前,先判斷線程池的狀態是否已經STOP,如果是,就中斷Worker的線程。
4. 一旦判斷當前線程不是STOP并且工作線程沒有中斷。那么就開始執行task.run()了。Worker的interruptIfStarted方法可以中斷這個Worker的線程,從而中斷正在執行任務。
5.?beforeExecute(wt, task)和afterExecute(wt,task)是兩個鉤子方法,支持在任務真正開始執行前就行擴展。
?
?final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
工作線程Worker創建和回收的源碼
首先看一下ThreadPoolExecutor的execute方法,這個方式是任務提交的入口。可以看到它的邏輯符合之前說的工作線程創建的基本策略
1. 當工作線程數量小于corePoolSize時,通過addWorker(command,true)來新建工作線程處理新建的任務,不入工作隊列
2. 當工作線程數量大于等于corePoolSize時,先入隊列,使用的是BlockingQueue的offer方法。當工作線程數量為0時,還會通過addWorker(null, false)添加一個新的工作線程
3. 當工作隊列滿了并且工作線程數量在corePoolSize和MaximumPoolSize之間,就創建新的工作線程去執行新添加的任務。當工作線程數量超過了MaximumPoolSize,就拒絕任務。
?
?public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到addWorker方法是創建Worker工作線程的所在。
1. retry這個循環判斷線程池的狀態和當前工作線程數量的邊界。如果允許創建工作現場,首先修改ctl變量表示的工作線程的數量
2. 把工作線程添加到workers集合中的操作要在mainLock這個鎖的保護下進行。所有和ThreadPoolExecutor狀態相關的操作都要在mainLock鎖的保護下進行
3. w = new Worker(firstTask); 創建Worker實例,把firstTask作為它當前的任務。firstTask為null時表示先只創建Worker線程,然后去工作隊列中取任務執行
4. 把新創建的Worker實例加入到workers集合,修改相關統計變量。
5. 當加入集合成功后,開始啟動這個Worker實例。啟動的方法是調用Worker封裝的Thread的start()方法。之前說了,這個Thread對應的Runnable是Worker本身,會去調用Worker的run方法,然后調用ThreadPoolExecutor的runWorker方法。在runWorker方法中真正去執行任務。
?
?private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
工作線程回收的方法是processWorkerExit(),它在runWorker方法執行結束的時候被調用。之前說了空閑的工作線程可能會在keepAliveTime時間之后被回收。這個邏輯隱含在runWorker方法和getTask方法中,會在下面說如何從工作隊列取任務時說明。processWorkerExit方法單純只是處理工作線程的回收。
1. 結合runWorker方法看,如果Worker執行task.run()的時候拋出了異常,那么completedAbruptly為true,需要從workers集合中把這個工作線程移除掉。
2. 如果是completedAbruptly為true,并且線程池不是STOP狀態,那么就創建一個新的Worker工作線程
3. 如果是completedAbruptly為false,并且線程池不是STOP狀態,首先檢查是否allowCoreThreadTimeout,如果運行,那么最少線程數可以為0,否則是corePoolSize。如果最少線程數為0,并且工作隊列不為空,那么最小值為1。最后檢查當前的工作線程數量,如果小于最小值,就創建新的工作線程。
?
?private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
任務的獲取
工作線程從工作隊列中取任務的代碼在getTask方法中
1. timed變量表示是否要計時,當計時超過keepAliveTime后還沒取到任務,就返回null。結合runWorker方法可以知道,當getTask返回null時,該Worker線程會被回收,這就是如何回收空閑工作線程的方法。
timed變量當allowCoreThreadTimeout為true或者當工作線程數大于corePoolSize時為true。
2. 如果timed為true,就用BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法來計時從隊頭取任務,否則直接用take()方法從隊頭取任務
?
?private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
線程池的關閉
線程池有SHUTDOWN, STOP, TIDYING, TERMINATED這幾個狀態和線程池關閉相關。通常我們把關閉分為優雅的關閉和強制立刻關閉。
所謂優雅的關閉就是調用shutdown()方法,線程池進入SHUTDOWN狀態,不在接收新的任務,會把工作隊列的任務執行完畢后再結束。
強制立刻關閉就是調用shutdownNow()方法,線程池直接進入STOP狀態,會中斷正在執行的工作線程,清空工作隊列。
1. 在shutdown方法中,先設置線程池狀態為SHUTDOWN,然后先去中斷空閑的工作線程,再調用onShutdown鉤子方法。最后tryTerminate()
2. 在shutdownNow方法中,先設置線程池狀態為STOP,然后先中斷所有的工作線程,再清空工作隊列。最后tryTerminate()。這個方法會把工作隊列中的任務返回給調用者處理。
?
?public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
? public List<Runnable> shutdownNow() {
??????? List<Runnable> tasks;
??????? final ReentrantLock mainLock = this.mainLock;
??????? mainLock.lock();
??????? try {
??????????? checkShutdownAccess();
??????????? advanceRunState(STOP);
??????????? interruptWorkers();
??????????? tasks = drainQueue();
??????? } finally {
??????????? mainLock.unlock();
??????? }
??????? tryTerminate();
??????? return tasks;
??? }
interruptIdleWorkers方法會去中斷空閑的工作線程,所謂空閑的工作線程即沒有上鎖的Worker。
而interruptWorkers方法直接去中斷所有的Worker,調用Worker.interruptIfStarted()方法
?
?private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
?private void interruptWorkers() {
??????? final ReentrantLock mainLock = this.mainLock;
??????? mainLock.lock();
??????? try {
??????????? for (Worker w : workers)
??????????????? w.interruptIfStarted();
??????? } finally {
??????????? mainLock.unlock();
??????? }
??? }
? void interruptIfStarted() {
??????????? Thread t;
??????????? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
??????????????? try {
??????????????????? t.interrupt();
??????????????? } catch (SecurityException ignore) {
??????????????? }
??????????? }
??????? }
tryTerminate方法會嘗試終止線程池,根據線程池的狀態,在相應狀態會中斷空閑工作線程,調用terminated()鉤子方法,設置狀態為TERMINATED。
?
?final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
最后說明一下,JVM的守護進程只有當所有派生出來的線程都結束后才會退出,使用ThreadPoolExecutor線程池時,如果有的任務一直執行,并且不響應中斷,那么會一直占用線程,那么JVM也會一直工作,不會退出。
總結
以上是生活随笔為你收集整理的聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(三十六)Java内存模型那些
- 下一篇: Breeze库API总结(Spark线性