Java:ThreadPoolExecutor解析
目錄
?
功能介紹
線程池相關類圖
源碼解析
基本概念
字段域
常量
線程構造重要字段
線程控制重要字段
方法
執行任務
ThreadPoolExecutor的關閉
功能介紹
線程池,顧名思義一個線程的池子,池子里存放了很多可以復用的線程,如果不用線程池類似的容器,每當我們需要創建新的線程時都需要去new Thread(),用完之后就被回收了,線程的啟動回收都需要用戶態到內核態的交互,頻繁的創建開銷比較大。而且隨著線程數的增加,會引起CPU頻繁的上下文切換嚴重影響性能。這時候線程池類似的容器就發揮出了作用。線程池里面的線程不但可以復用,而且還可以控制線程并發的數量,是CPU的性能達到最優。
線程池相關類圖
源碼解析
基本概念
工作線程:即用于執行任務的線程。
任務:將要執行的任務
緩存隊列:工作線程全部被占用時,緩存任務的隊列。
字段域
常量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //0001 1111 1111 1111 1111 1111 1111 1111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //1110 0000 0000 0000 0000 0000 0000 0000 //能接受新任務,隊列中的任務可繼續運行 private static final int RUNNING = -1 << COUNT_BITS; //0000 0000 0000 0000 0000 0000 0000 0000 //不再接受新任務,隊列中的任務仍可繼續執行 private static final int SHUTDOWN = 0 << COUNT_BITS; //0010 0000 0000 0000 0000 0000 0000 0000 //不再接受新任務,不再執行隊列中的任務,中斷所有執行中的任務(發中斷消息) private static final int STOP = 1 << COUNT_BITS; //0100 0000 0000 0000 0000 0000 0000 0000 //所有任務均已終止,workerCount的值為0,轉到TIDYING狀態的線程即將要執行terminated()鉤子方法. private static final int TIDYING = 2 << COUNT_BITS; //0110 0000 0000 0000 0000 0000 0000 0000 //terminated()方法執行結束 private static final int TERMINATED = 3 << COUNT_BITS;?線程池的狀態控制由AtomicInteger類型變量ctl 控制,其值為32位整形,其中前3位用于線程池狀態,后29位用于線程數量控制。
線程池具有5個狀態:
RUNNING = 111 SHUTDOWN = 000 STOP = 001 TIDYING = 010 TERMINATED = 011?各狀態之間可能的轉變有以下幾種:
RUNNING -> SHUTDOWN調用了shutdown方法,線程池實現了finalize方法。在finalize內調用了shutdown方法。因此shutdown可能是在finalize中被隱式調用的 (RUNNING or SHUTDOWN) -> STOP調用了shutdownNow方法 SHUTDOWN -> TIDYING當隊列和線程池均為空的時候 STOP -> TIDYING當線程池為空的時候 TIDYING -> TERMINATEDterminated()鉤子方法調用完畢?狀態的解析
//初始化線程數 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //獲取當前線程的運行狀態 //~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取當前的線程數 //CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111 private static int workerCountOf(int c) { return c & CAPACITY; } //或操作 // 111x xxxx xxxx xxxx xxxx xxxx xxxx xxxx | 0000 0000 0000 0000 0000 0000 0000 0100 // = 1110 0000 0000 0000 0000 0000 0000 0100 private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}?狀態的設計理念
- 將運行狀態設置為小于0的數,便于判斷當前線程池是否處于running狀態;
- 僅使用32位存儲線程池狀態與線程池內的線程數,若需要獲取線程池信息,只需要一個int即可
- 采用位運算,提高了執行效率;
- 同時,線程池狀態還有擴增空間(23=8,目前只有5種狀態),而線程池最大容量2^29,也可保證在絕大部分應用中是不會溢出的。而在源碼中也聲明了:如果在未來這個也成為一個問題,那么可以擴增為AtomicLong。
線程構造重要字段
- corePoolSize:核心池的大小(即任務線程的個數),這個參數與后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;緩存隊列即可以是有界的,也可以是無界的,僅當緩存隊列不可加入任務,并且任務線程數不大于maximumPoolSize時,才又創建任務線程。
- maximumPoolSize:線程池最大線程數,它表示在線程池中最多能創建多少個線程;
- keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize:即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize;但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
- unit:參數keepAliveTime的時間單位
線程控制重要字段
//待執行線程隊列 private final BlockingQueue<Runnable> workQueue; //鎖,基于重入鎖,線程池核心之一 private final ReentrantLock mainLock = new ReentrantLock(); //線程隊列,這是該線程池內已有線程 //注意與workQueue的區別 private final HashSet<Worker> workers = new HashSet<Worker>(); //多線程協調通信 private final Condition termination = mainLock.newCondition(); //拒絕handler,用于線程池不接受新加線程時的處理方式 //分為系統拒絕(線程池要關閉等),與線程池飽和(已達線程池最大容量) private volatile RejectedExecutionHandler handler; //線程工廠,新建線程池時帶入 private volatile ThreadFactory threadFactory; //默認拒絕向線程池中新加線程的方式:丟棄 private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();方法
執行任務
使用ThreadPoolExecutor執行任務的時候,可以使用execute或submit方法,submit方法如下:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}submit方法內部使用了execute方法,而且submit方法是有返回值的。在調用execute方法之前,使用FutureTask包裝一個Runnable,這個FutureTask就是返回值。
execute()方法
源碼如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) { // 如果工作線程的數量小于corePoolSize,表示可以創建線程直接用于運行任務。if (addWorker(command, true)) return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { //工作線程的數量>=corePoolSize,則不能再創建工作線程了,需要把任務加入緩存隊列中去。int recheck = ctl.get();//檢查當前線程池狀態是否不在Running狀態了//若是,將線程cmd從等待隊列內移除//這個時候存在一種case,線程池不處于running狀態//但是remove失敗了,這個時候看具體的queue處理了//線程池還是很忠實的去嘗試interruptif (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果緩存隊列加入不了任務了,則又可以創建新的工作線程了。else if (!addWorker(command, false)) //創建工作線程失敗,則執行reject策略。reject(command); }提交一個新的任務的3種情況如下:
- 如果當前正在執行的Worker數量比corePoolSize(基本大小)要小。直接創建一個新的Worker執行任務,會調用addWorker方法
- 如果當前正在執行的Worker數量大于等于corePoolSize(基本大小)。將任務放到阻塞隊列里,如果阻塞隊列沒滿并且狀態是RUNNING的話,直接丟到阻塞隊列,否則執行第3步。丟到阻塞隊列之后,還需要再做一次驗證(丟到阻塞隊列之后可能另外一個線程關閉了線程池或者剛剛加入到隊列的線程死了)。如果這個時候線程池不在RUNNING狀態,把剛剛丟入隊列的任務remove掉,調用reject方法,否則查看Worker數量,如果Worker數量為0,起一個新的Worker去阻塞隊列里拿任務執行
- 丟到阻塞失敗的話,會調用addWorker方法嘗試起一個新的Worker去阻塞隊列拿任務并執行任務,如果這個新的Worker創建失敗,調用reject方法
注意:
每次判斷狀態時,都必須重新獲取狀態。
addWorker()方法
addWorker關系著如何起一個線程,Worker是一個AQS的實現類(參考:AbstractQueuedSynchronizer源碼解析),同時也是一個實現Runnable的類,使用獨占鎖,它的構造函數只接受一個Runnable參數,內部保存著這個Runnable屬性,還有一個thread線程屬性用于包裝這個Runnable(這個thread屬性使用ThreadFactory構造,在構造函數內完成thread線程的構造),另外還有一個completedTasks計數器表示這個Worker完成的任務數。Worker類復寫了run方法,使用ThreadPoolExecutor的runWorker方法(在addWorker方法里調用),直接啟動Worker的話,會調用ThreadPoolExecutor的runWork方法。需要特別注意的是這個Worker是實現了Runnable接口的,thread線程屬性使用ThreadFactory構造Thread的時候,構造的Thread中使用的Runnable其實就是Worker。下面的Worker的源碼:
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable {private static final long serialVersionUID = 6138294804551838833L;/** worker綁定的線程.null表示失敗 */final Thread thread;/** 初始化時指定的任務,可為null. */Runnable firstTask;/** 完成任務數 */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {//把狀態位設置成-1,這樣任何線程都不能得到Worker的鎖,除非調用了unlock方法。//這個unlock方法會在runWorker方法中一開始就調用,這是為了確保Worker構造出來之后,沒有任何線程能夠得到它的鎖,//除非調用了runWorker之后,其他線程才能獲得Worker的鎖setState(-1); this.firstTask = firstTask;// 使用ThreadFactory構造Thread,這個構造的Thread內部的Runnable就是本身,也就是Worker。//所以得到Worker的thread并start的時候,會執行Worker的run方法,也就是執行ThreadPoolExecutor的runWorker方法this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}//0表示unlock,1表示lockprotected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}} }addWork代碼
//嘗試向線程池內新增一個線程 private boolean addWorker(Runnable firstTask, boolean core) {//用于跳出外層循環的標簽retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//若線程池處于非運行狀態//且//或rs不處于SHUTDOWN狀態(STOP、TIDYING、TERMINATED 之一)//或firstTask不為空 (非運行狀態,不可以再增加Task了,所以firstTask不為空要返回false)//或緩沖隊列為空 (size== 0,表示不可以插入元素了)//那么返回false,表明新增一個線程失敗(執行firstTask 也失敗)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//此時線程池處于running狀態,firstTask不為空//且緩沖隊列不為空,此時需要新增一個線程for (;;) {//獲取線程池當前線程數量int wc = workerCountOf(c);//若線程池超過最大容量,或大于設定的容量//corePoolSize與maximumPoolSize均為傳入的參數//那么直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//線程池未過限,那么采用cas機制,將線程池計數器擴增1,跳出標簽if (compareAndIncrementWorkerCount(c))break retry;//獲取當前線程池信息c = ctl.get(); // Re-read ctl//若線程池狀態有變更,從標簽處重新循環if (runStateOf(c) != rs)continue retry;//若線程池狀態未變化,繼續內層的for循環}}//上面若將線程池計數器加1了//這里就要對線程池擴增了(即增加一個工作線程)boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//創建一個線程實例w = new Worker(firstTask);//獲取線程//不在創建實例時直接run該線程,是避免構造函數未執行完,就run導致的異常final Thread t = w.thread;if (t != null) {//重入鎖final ReentrantLock mainLock = this.mainLock;//鎖上,走起mainLock.lock();try {//獲取線程池狀態int rs = runStateOf(ctl.get());//若線程池狀態為running狀態//或為SHUTDOWN且傳入的線程為空if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//若新起的線程狀態為活躍狀態if (t.isAlive()) // precheck that t is startable//拋異常throw new IllegalThreadStateException();//否則向運行線程池內加上該線程workers.add(w);int s = workers.size();//判斷當前線程池是否為線程池最大//是的話交換,做統計用if (s > largestPoolSize)largestPoolSize = s;//新增標志位置為成功workerAdded = true;}} finally {//重入鎖解鎖mainLock.unlock();}//判斷線程是否加入成功if (workerAdded) {//加入成功//啟動當前線程,將當前線程交有os管理t.start();//設置標志位workerStarted = true;}}} finally {//若未啟動成功if (! workerStarted)//回滾當前新起線程操作//移除當前新增失敗的線程//將線程池計數器減1//嘗試中斷線程池或者中斷當前線程addWorkerFailed(w);}//返回標志位,是否新增線程成功return workerStarted; }邏輯圖如下:
上圖來源于:https://blog.csdn.net/varyall/article/details/82392048
runWorker()方法
Worker中的線程start的時候,調用Worker本身run方法,又調用外部類ThreadPoolExecutor的runWorker方法,runWorker方法代碼如下:
final void runWorker(Worker w) {Thread wt = Thread.currentThread(); // 得到當前線程Runnable task = w.firstTask; // 得到Worker中的任務task,也就是用戶傳入的taskw.firstTask = null; // 將Worker中的任務置空w.unlock(); // allow interrupts。 boolean completedAbruptly = true;try {// 如果worker中的任務不為空,繼續執行,//如果worker中的任務為空,則使用getTask獲得任務。//一直死循環,除非得到的任務為空才退出while (task != null || (task = getTask()) != null) {w.lock(); // 在執行任務之前先做一些處理。 //1. 如果線程池已經處于STOP狀態并且當前線程沒有被中斷,中斷線程 //2. 如果線程池還處于RUNNING或SHUTDOWN狀態,并且當前線程已經被中斷了,重新檢查一下線程池狀態,如果處于STOP狀態并且沒有被中斷,那么中斷線程if ((runStateAtLeast(ctl.get(), STOP) //如果線程池已經處于STOP狀態或者之后的狀態||(Thread.interrupted() //本線程已處于中斷狀態&&runStateAtLeast(ctl.get(), STOP) //再檢查一次)) &&!wt.isInterrupted() //worker未狀態)wt.interrupt();try {beforeExecute(wt, task); // 任務執行前需要做什么,ThreadPoolExecutor是個空實現Throwable thrown = null;try {task.run(); // 真正的開始執行任務,調用的是run方法,而不是start方法。這里run的時候可能會被中斷,比如線程池調用了shutdownNow方法} catch (RuntimeException x) { // 任務執行發生的異常全部拋出,不在runWorker中處理thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 任務執行結束需要做什么,ThreadPoolExecutor是個空實現}} finally {task = null;w.completedTasks++; // 記錄執行任務的個數w.unlock(); // 執行完任務之后,解鎖,Worker變成閑置Worker}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 回收Worker方法} }getTask()方法:
用于緩存隊列中獲取一個Task。阻塞隊列參考:Java并發包--阻塞隊列(BlockingQueue)
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//如果線程池處于shutdown狀態,//并且隊列為空,或者線程池處于stop或者terminate狀態,//則:線程池數量-1,返回null,回收線程if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//標識當前線程在空閑時,是否應該超時回收boolean timed; for (;;) {int wc = workerCountOf(c);//如果allowCoreThreadTimeOut 為ture//或者當前線程數量大于核心線程池數目,//則需要超時回收timed = allowCoreThreadTimeOut || wc > corePoolSize;//(1)//如果線程數目小于最大線程數目,//且不允許超時回收或者未超時,//則跳出循環,繼續去阻塞隊列中取任務(2)if (wc <= maximumPoolSize && ! (timedOut && timed))break;//如果上面if沒有成立,則當前線程數-1,返回null,回收該線程if (compareAndDecrementWorkerCount(c))return null;//如果上面if沒有成立,則CAS修改ctl失敗,重讀,cas循環重新嘗試修改c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}(2)try {//如果允許空閑回收,則調用阻塞隊列的poll,//否則take,一直等到隊列中有可取任務Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//取到任務,返回任務,//否則超時timedOut = true;進入下一個循環,//并且在(1)處會不成立,進而進入到cas修改ctl的程序中if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }processWorkerExit()方法
如果getTask返回的是null,那說明阻塞隊列已經沒有任務并且當前調用getTask的Worker需要被回收,那么會調用processWorkerExit方法進行回收:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果Worker沒有正常結束流程調用processWorkerExit方法,worker數量減一。如果是正常結束的話,在getTask方法里worker數量已經減一了decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 加鎖,防止并發問題try {completedTaskCount += w.completedTasks; // 記錄總的完成任務數workers.remove(w); // 線程池的worker集合刪除掉需要回收的Worker} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結束線程池int c = ctl.get();if (runStateLessThan(c, STOP)) { // 如果線程池還處于RUNNING或者SHUTDOWN狀態// Worker是正常結束流程的話if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // 不需要新開一個Worker}// 新開一個Worker代替原先的Worker// 新開一個Worker需要滿足以下3個條件中的任意一個:// 1. 用戶執行的任務發生了異常// 2. Worker數量比線程池基本大小要小// 3. 阻塞隊列不空但是沒有任何Worker在工作addWorker(null, false);} }tryTerminate()方法
在回收Worker的時候線程池會嘗試結束自己的運行,tryTerminate方法:
final void tryTerminate() {for (;;) {int c = ctl.get();// 滿足3個條件中的任意一個,不終止線程池// 1. 線程池還在運行,不能終止// 2. 線程池處于TIDYING或TERMINATED狀態,說明已經在關閉了,不允許繼續處理// 3. 線程池處于SHUTDOWN狀態并且阻塞隊列不為空,這時候還需要處理阻塞隊列的任務,不能終止線程池if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 走到這一步說明線程池已經不在運行,阻塞隊列已經沒有任務,但是還要回收正在工作的Workerif (workerCountOf(c) != 0) {// 由于線程池不運行了,調用了線程池的關閉方法,在解釋線程池的關閉原理的時候會說道這個方法interruptIdleWorkers(ONLY_ONE); // 中斷閑置Worker,直到回收全部的Worker。這里沒有那么暴力,只中斷一個,中斷之后退出方法,中斷了Worker之后,Worker會回收,然后還是會調用tryTerminate方法,如果還有閑置線程,那么繼續中斷return;}// 走到這里說明worker已經全部回收了,并且線程池已經不在運行,阻塞隊列已經沒有任務。可以準備結束線程池了final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 加鎖,防止并發try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,將線程池狀態改成TIDYINGtry {terminated(); // 調用terminated方法} finally {ctl.set(ctlOf(TERMINATED, 0)); // terminated方法調用完畢之后,狀態變為TERMINATEDtermination.signalAll();}return;}} finally {mainLock.unlock(); // 解鎖}// else retry on failed CAS} }ThreadPoolExecutor的關閉
shutdown方法,關閉線程池,關閉之后阻塞隊列里的任務不受影響,會繼續被Worker處理,但是新的任務不會被接受:
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 關閉的時候需要加鎖,防止并發try {checkShutdownAccess(); // 檢查關閉線程池的權限advanceRunState(SHUTDOWN); // 把線程池狀態更新到SHUTDOWNinterruptIdleWorkers(); // 中斷閑置的WorkeronShutdown(); // 鉤子方法,默認不處理。ScheduledThreadPoolExecutor會做一些處理} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結束線程池,上面已經分析過了 }interruptIdleWorkers方法,注意,這個方法中斷的是閑置Worker,中斷閑置Worker之后,getTask方法會返回null,然后Worker會被回收。那什么是閑置Worker呢?
閑置Worker是這樣解釋的:Worker運行的時候會去阻塞隊列拿數據(getTask方法),拿的時候如果沒有設置超時時間,那么會一直阻塞直到等待阻塞隊列進數據,即worker沒有獲取到任務,這樣的Worker就被稱為閑置Worker。由于Worker也是一個AQS,在runWorker方法里會有一對lock和unlock操作,這對lock操作是為了確保Worker不是一個閑置Worker。
所以Worker被設計成一個AQS是為了根據Worker的鎖來判斷是否是閑置線程,是否可以被強制中斷。
下面我們看下interruptIdleWorkers方法:
// 調用他的一個重載方法,傳入了參數false,表示要中斷所有的正在運行的閑置Worker,如果為true表示只打斷一個閑置Worker private void interruptIdleWorkers() {interruptIdleWorkers(false); }private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 中斷閑置Worker需要加鎖,防止并發try {for (Worker w : workers) { Thread t = w.thread; // 拿到worker中的線程if (!t.isInterrupted() // Worker中的線程沒有被中斷 && w.tryLock() //并且Worker可以獲取鎖,這里Worker能獲取鎖說明Worker是個閑置Worker,在阻塞隊列里拿數據一直被阻塞,沒有數據進來。如果沒有獲取到Worker鎖,說明Worker還在執行任務,不進行中斷(shutdown方法不會中斷正在執行的任務) ) { try {t.interrupt(); // 中斷Worker線程} catch (SecurityException ignore) {} finally {w.unlock(); // 釋放Worker鎖}}if (onlyOne) // 如果只打斷1個Worker的話,直接break退出,否則,遍歷所有的Workerbreak;}} finally {mainLock.unlock(); // 解鎖} }shutdown方法
將線程池狀態改成SHUTDOWN,線程池還能繼續處理阻塞隊列里的任務,并且會回收一些閑置的Worker。
但是shutdownNow方法不一樣,它會把線程池狀態改成STOP狀態,這樣不會處理阻塞隊列里的任務,也不會處理新的任務:
// shutdownNow方法會有返回值的,返回的是一個任務列表,而shutdown方法沒有返回值 public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // shutdownNow操作也需要加鎖,防止并發try {checkShutdownAccess(); // 檢查關閉線程池的權限advanceRunState(STOP); // 把線程池狀態更新到STOPinterruptWorkers(); // 中斷Worker的運行tasks = drainQueue();} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結束線程池,上面已經分析過了return tasks; }shutdownNow的中斷和shutdown方法不一樣,調用的是interruptWorkers方法:
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 中斷Worker需要加鎖,防止并發try {for (Worker w : workers)w.interruptIfStarted(); // 中斷Worker的執行} finally {mainLock.unlock(); // 解鎖} }Worker的interruptIfStarted方法中斷Worker的執行:
void interruptIfStarted() {Thread t;// Worker無論是否被持有鎖,只要還沒被中斷,那就中斷Workerif (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt(); // 強行中斷Worker的執行} catch (SecurityException ignore) {}} }線程池關閉總結:
線程池的關閉主要是兩個方法,shutdown和shutdownNow方法。
shutdown方法會更新狀態到SHUTDOWN,不會影響阻塞隊列里任務的執行,但是不會執行新進來的任務。同時也會回收閑置的Worker,閑置Worker的定義上面已經說過了。
shutdownNow方法會更新狀態到STOP,會影響阻塞隊列的任務執行,也不會執行新進來的任務。同時會回收所有的Worker。
?
參考:
https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/
?
總結
以上是生活随笔為你收集整理的Java:ThreadPoolExecutor解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FutureTask源码
- 下一篇: Java:ThreadPoolExecu