java多线程系列:ThreadPoolExecutor源码分析
前言
這篇主要講述ThreadPoolExecutor的源碼分析,貫穿類的創建、任務的添加到線程池的關閉整個流程,讓你知其然所以然。希望你可以通過本篇博文知道ThreadPoolExecutor是怎么添加任務、執行任務的,以及延伸的知識點。那么先來看看ThreadPoolExecutor的繼承關系吧。
繼承關系
Executor接口
public interface Executor {void execute(Runnable command); }Executor接口只有一個方法execute,傳入線程任務參數
ExecutorService接口
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }ExecutorService接口繼承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。
AbstractExecutorService抽象類
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {... }public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {...}}AbstractExecutorService抽象類實現ExecutorService接口,并且提供了一些方法的默認實現,例如submit方法、invokeAny方法、invokeAll方法。
像execute方法、線程池的關閉方法(shutdown、shutdownNow等等)就沒有提供默認的實現。
ThreadPoolExecutor
先介紹下ThreadPoolExecutor線程池的狀態吧
線程池狀態
int 是4個字節,也就是32位(注:一個字節等于8位)
//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,后29位表示線程數量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程數量統計位數29 Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3; //容量 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//運行中 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //關閉 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //整理 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //終止 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;//獲取運行狀態(獲取前3位) private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取線程個數(獲取后29位) private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }- RUNNING:接受新任務并且處理阻塞隊列里的任務
- SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務
- STOP:拒絕新任務并且拋棄阻塞隊列里的任務同時會中斷正在處理的任務
- TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為0,將要調用terminated方法
- TERMINATED:終止狀態。terminated方法調用完成以后的狀態
線程池狀態轉換
RUNNING -> SHUTDOWN顯式調用shutdown()方法, 或者隱式調用了finalize()方法 (RUNNING or SHUTDOWN) -> STOP顯式調用shutdownNow()方法 SHUTDOWN -> TIDYING當線程池和任務隊列都為空的時候 STOP -> TIDYING當線程池為空的時候 TIDYING -> TERMINATED當 terminated() hook 方法執行完成時候構造函數
有四個構造函數,其他三個都是調用下面代碼中的這個構造函數
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { }參數介紹
| corePoolSize | int | 核心線程數 |
| maximumPoolSize | int | 最大線程數 |
| keepAliveTime | long | 存活時間 |
| unit | TimeUnit | 時間單位 |
| workQueue | BlockingQueue | 存放線程的隊列 |
| threadFactory | ThreadFactory | 創建線程的工廠 |
| handler | RejectedExecutionHandler | 多余的的線程處理器(拒絕策略) |
提交任務
submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask; }public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask; }public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask; }流程步驟如下
流程圖如下
execute
public void execute(Runnable command) {//傳進來的線程為null,則拋出空指針異常if (command == null)throw new NullPointerException();//獲取當前線程池的狀態+線程個數變量int c = ctl.get();/*** 3個步驟*///1.判斷當前線程池線程個數是否小于corePoolSize,小于則調用addWorker方法創建新線程運行,且傳進來的Runnable當做第一個任務執行。//如果調用addWorker方法返回false,則直接返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//2.如果線程池處于RUNNING狀態,則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {//二次檢查int recheck = ctl.get();//如果當前線程池狀態不是RUNNING則從隊列刪除任務,并執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);//否者如果當前線程池線程空,則添加一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//3.新增線程,新增失敗則執行拒絕策略else if (!addWorker(command, false))reject(command); }其實從上面代碼注釋中可以看出就三個判斷,
然后根據這三個條件進行不同的操作,下圖是Java并發編程的藝術書中的線程池的主要處理流程,或許會比較容易理解些
下面是整個流程的詳細步驟
可能看上面會有點繞,不清楚的可以看下面的流程圖
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 檢查當前線程池狀態是否是SHUTDOWN、STOP、TIDYING或者TERMINATED// 且!(當前狀態為SHUTDOWN、且傳入的任務為null,且隊列不為null)// 條件都成立則返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//循環for (;;) {int wc = workerCountOf(c);//如果當前的線程數量超過最大容量或者大于(根據傳入的core決定是核心線程數還是最大線程數)核心線程數 || 最大線程數,則返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS增加c,成功則跳出retryif (compareAndIncrementWorkerCount(c))break retry;//CAS失敗執行下面方法,查看當前線程數是否變化,變化則繼續retry循環,沒變化則繼續內部循環c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}//CAS成功boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一個線程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新檢查線程池狀態//避免ThreadFactory退出故障或者在鎖獲取前線程池被關閉int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 先檢查線程是否是可啟動的throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//判斷worker是否添加成功,成功則啟動線程,然后將workerStarted設置為trueif (workerAdded) {t.start();workerStarted = true;}}} finally {//判斷線程有沒有啟動成功,沒有則調用addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}return workerStarted; }這里可以將addWorker分為兩部分,第一部分增加線程池個數,第二部分是將任務添加到workder里面并執行。
第一部分主要是兩個循環,外層循環主要是判斷線程池狀態,下面描述來自Java中線程池ThreadPoolExecutor原理探究
rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())展開!運算后等價于
s >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())也就是說下面幾種情況下會返回false:
- 當前線程池狀態為STOP,TIDYING,TERMINATED
- 當前線程池狀態為SHUTDOWN并且已經有了第一個任務
- 當前線程池狀態為SHUTDOWN并且任務隊列為空
內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。
到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這里使用全局的獨占鎖來控制workers里面添加任務,其實也可以使用并發安全的set,但是性能沒有獨占鎖好(這個從注釋中知道的)。這里需要注意的是要在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。
所以這里也將流程圖分為兩部分來描述
第一部分流程圖
第二部分流程圖
Worker對象
Worker是定義在ThreadPoolExecutor中的finnal類,其中繼承了AbstractQueuedSynchronizer類和實現Runnable接口,其中的run方法如下
public void run() {runWorker(this); }線程啟動時調用了runWorker方法,關于類的其他方面這里就不在敘述。
runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//循環獲取任務while (task != null || (task = getTask()) != null) {w.lock();// 當線程池是處于STOP狀態或者TIDYING、TERMINATED狀態時,設置當前線程處于中斷狀態// 如果不是,當前線程就處于RUNNING或者SHUTDOWN狀態,確保當前線程不處于中斷狀態// 重新檢查當前線程池的狀態是否大于等于STOP狀態if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//提供給繼承類使用做一些統計之類的事情,在線程運行前調用beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//提供給繼承類使用做一些統計之類的事情,在線程運行之后調用afterExecute(task, thrown);}} finally {task = null;//統計當前worker完成了多少個任務w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//整個線程結束時調用,線程退出操作。統計整個線程池完成的任務個數之類的工作processWorkerExit(w, completedAbruptly);} }getTask
getTask方法的主要作用其實從方法名就可以看出來了,就是獲取任務
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//循環for (;;) {int c = ctl.get();int rs = runStateOf(c);//線程線程池狀態和隊列是否為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//線程數量int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(當前線程數是否大于最大線程數或者)//且(線程數大于1或者任務隊列為空)//這里有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//獲取任務Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }關閉線程池
shutdown
當調用shutdown方法時,線程池將不會再接收新的任務,然后將先前放在隊列中的任務執行完成。
下面是shutdown方法的源碼
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }shutdownNow
立即停止所有的執行任務,并將隊列中的任務返回
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }shutdown和shutdownNow區別
shutdown和shutdownNow這兩個方法的作用都是關閉線程池,流程大致相同,只有幾個步驟不同,如下
總結
線程池可以給我們多線程編碼上提供極大便利,就好像數據庫連接池一樣,減少了線程的開銷,提供了線程的復用。而且ThreadPoolExecutor也提供了一些未實現的方法,供我們來使用,像beforeExecute、afterExecute等方法,我們可以通過這些方法來對線程進行進一步的管理和統計。
在使用線程池上好需要注意,提交的線程任務可以分為CPU 密集型任務和IO 密集型任務,然后根據任務的不同進行分配不同的線程數量。
- CPU密集型任務:
- 應當分配較少的線程,比如 CPU個數相當的大小
- IO 密集型任務:
- 由于線程并不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數 * 2
- 混合型任務:
- 可以將其拆分為 CPU 密集型任務以及 IO 密集型任務,這樣來分別配置。
好了,這篇博文到這里就結束了,文中可能會有些紕漏,歡迎留言指正。
如果本文對你有所幫助,給個star唄,謝謝。本文GitHub地址:點這里點這里
參考資料
作者: 云梟zd
Github: Github地址
出處: https://www.cnblogs.com/fixzd/
版權聲明:本文歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則視為侵權。
總結
以上是生活随笔為你收集整理的java多线程系列:ThreadPoolExecutor源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爬虫基本原理及Request和Respo
- 下一篇: java开源库web3j的以太坊过滤器(