日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)

發布時間:2025/5/22 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程池

線程池初始化時是沒有創建線程的,線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷

public ThreadPoolExecutor(// 線程池核心線程數int corePoolSize, // 線程池最大數int maximumPoolSize, // 空閑線程存活時間long keepAliveTime, // 時間單位TimeUnit unit,// 線程池所使用的緩沖隊列BlockingQueue<Runnable> workQueue,// 線程池創建線程使用的工廠ThreadFactory threadFactory,// 線程池對拒絕任務的處理策略RejectedExecutionHandler handler)

ThreadPoolExecutor 源碼分析

線程池狀態

ThreadPoolExecutor有一個AtomicInteger變量,叫ctl(control的簡寫),一共32位,高3位存線程池狀態runState(一共5種狀態:Running,Shutdown,Stop,Tidying,Terminate),低29位存當前有效線程數workerCount

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// 線程池最大線程數 = 536870911(2^29-1)// 將 1 的二進制向右位移 29 位,再減 1 表示最大線程容量// CAPACITY :00011111111111111111111111111111// 高3位000,低29位全為1private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 運行狀態保存在 int 值的高 3 位 (所有數值左移 29 位)// RUNNING :11100000000000000000000000000000// 高3位111,低29位全為0private static final int RUNNING = -1 << COUNT_BITS;// SHUTDOWN :00000000000000000000000000000000// 高3位000,低29位全為0private static final int SHUTDOWN = 0 << COUNT_BITS;// STOP :00100000000000000000000000000000// 高3位001,低29位全為0private static final int STOP = 1 << COUNT_BITS;// TIDYING :01000000000000000000000000000000// 高3位010,低29位全為0private static final int TIDYING = 2 << COUNT_BITS;// TERMINATED:01100000000000000000000000000000// 高3位011,低29位全為0private static final int TERMINATED = 3 << COUNT_BITS;

分析:

事實上COUNT_BITS = 29,而上面的5重線程狀態實際上是使用32位中的高3位來表示,低29位存線程數,這樣線程池的狀態和線程數量就由一個變量存儲,即:

  • RUNNING = 111: 線程池正常運行,可以接受新的任務并處理隊列中的任務。
  • SHUTDOWN = 000:關閉狀態,不再接受新的任務,但是會執行隊列中的任務。在線程池處于 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
  • STOP = 001:不再接受新任務,不處理隊列中的任務,中斷進行中的任務。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
  • TIDYING = 010:所有任務已經終止,workerCount為0,線程狀態轉換到TIDYING,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
  • TERMINATED = 011:terminate()函數執行完成后進入該狀態。
// Packing and unpacking ctl/*** 這個方法用于取出runState的值 因為CAPACITY值為:00011111111111111111111111111111* ~為按位取反操作,則~CAPACITY值為:11100000000000000000000000000000* 再同參數做 & 操作,就將低29位置0了,而高3位還是保持原先的值,也就是runState的值* * @param c 該參數為存儲runState和workerCount的int值* @return runState的值*/private static int runStateOf(int c) { return c & ~CAPACITY; }/*** 這個方法用于取出workerCount的值* 因為CAPACITY值為:00011111111111111111111111111111,所以&操作將參數的高3位置0了* 保留參數的低29位,也就是workerCount的值* * @param c ctl, 存儲runState和workerCount的int值* @return workerCount的值*/private static int workerCountOf(int c) { return c & CAPACITY; }/*** 將runState和workerCount存到同一個int中* * @param rs runState移位過后的值,負責填充返回值的高3位* @param wc workerCount移位過后的值,負責填充返回值的低29位* @return 兩者或運算過后的值*/private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}// 只有 RUNNING 狀態會小于 0private static boolean isRunning(int c) {return c < SHUTDOWN;}

execute 方法

public void execute(Runnable command) {/*** 源代碼閱讀說明:* 1、這里的command就是task;因為這里使用了命令模式,所以用command命名;執行task,task被包裝成了一個work。* 2、isRunning檢查當前ThreadPoolExecutor是否是運行狀態* 3、workerCountOf會根據ThreadPoolExecutor的狀態值獲得當前正的正在執行的和等待的work數量。* 這里先不用深究isRunning和workerCounterOf方法的實現。* 4、addWork方法會新增一個線程,返回true表示新增成功,返回false表示新增失敗*/if (command == null)// 如果任務為null,則拋出 NullPointerException 異常throw new NullPointerException();// 獲取當前線程池的狀態 + 線程個數變量的組合值int c = ctl.get();// 當前線程池線程個數是否小于 corePoolSize,小于則開啟新線程運行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 如果線程池處于RUNNING狀態,核心池已滿,但任務隊列未滿,則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {// 任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了int recheck = ctl.get();// 如果當前線程池狀態不是RUNNING則從隊列刪除任務,并執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);// 否者如果當前線程池線程空,則添加一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 核心池已滿,隊列已滿,試著創建一個新線程else if (!addWorker(command, false))// 如果創建新線程失敗了,說明線程池被關閉或者線程池完全滿了,拒絕任務reject(command);}

execute 方法 執行流程

1、首先判斷任務是否為空,空則拋出空指針異常

2、不為空則獲取線程池控制狀態,判斷小于corePoolSize(核心線程),添加到worker集合當中執行,

  • 如成功,則返回
  • 失敗的話再接著獲取線程池控制狀態,因為只有狀態變了才會失敗,所以重新獲取

3、判斷線程池是否處于運行狀態,是的話則添加 command 到阻塞隊列,加入時也會再次獲取狀態并且檢測狀態是否不處于運行狀態,不處于的話則將 command 從阻塞隊列移除,并且拒絕任務

4、如果線程池里沒有了線程,則創建新的線程去執行獲取阻塞隊列的任務執行

5、如果以上都沒執行成功,則需要開啟最大線程池里的線程來執行任務,失敗的話就丟棄

addWorker方法

如果工作線程數小于核心線程數的話,會調用 addWorker ,顧名思義,其實就是要創建一個工作線程。我們來看看源碼的實現

源碼比較長,其實就做了兩件事。

  • 才用循環 CAS 操作來將線程數加 1;
  • 新建一個線程并啟用。
private boolean addWorker(Runnable firstTask, boolean core) {// goto 語句,避免死循環retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary./*** 如果線程處于非運行狀態,并且 rs 不等于 SHUTDOWN 且 * firstTask 不等于空且 workQueue 為空,* 直接返回 false(表示不可添加 work 狀態)* 1. 線程池已經 shutdown 后,還要添加新的任務,拒絕* 2. (第二個判斷)SHUTDOWN 狀態不接受新任務,* 但仍然會執行已經加入任務隊列的任務,* 所以當進入 SHUTDOWN 狀態,而傳進來的任務為空,* 并且任務隊列不為空的時候,是允許添加新線程的,* 如果把這個條件取反,就表示不允許添加 worker*/if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 獲得 Worker 工作線程數int wc = workerCountOf(c);// 如果工作線程數大于默認容量大小或者大于核心線程數大小,// 則直接返回 false 表示不能再添加 worker。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 通過 cas 來增加工作線程數,如果 cas 失敗,則直接重試if (compareAndIncrementWorkerCount(c))break retry;// 再次獲取 ctl 的值c = ctl.get(); // Re-read ctl// 這里如果不想等,說明線程的狀態發生了變化,繼續重試if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}//上面這段代碼主要是對 worker 數量做原子+1 操作,下面的邏輯才是正式構建一個 worker// 工作線程是否啟動的標識boolean workerStarted = false;// 工作線程是否已經添加成功的標識boolean workerAdded = false;Worker w = null;try {// 構建一個 Worker,這個 worker 是什么呢?// 我們可以看到構造方法里面傳入了一個 Runnable 對象w = new Worker(firstTask);// 從 worker 對象中取出線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 這里有個重入鎖,避免并發問題mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//只有當前線程池是正在運行狀態,[或是 SHUTDOWN 且 firstTask 為空],才能添加到 workers 集合中if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//任務剛封裝到 work 里面,還沒 start,你封裝的線程就是 alive,幾個意思?肯定是要拋異常出去的if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 將新創建的 Worker 添加到 workers 集合中workers.add(w);int s = workers.size();// 如果集合中的工作線程數大于最大線程數,// 這個最大線程數表示線程池曾經出現過的最大線程數if (s > largestPoolSize)// 更新線程池出現過的最大線程數largestPoolSize = s;// 表示工作線程創建成功了workerAdded = true;}} finally {// 釋放鎖mainLock.unlock();}// 如果 worker 添加成功if (workerAdded) {// 啟動線程t.start();workerStarted = true;}}} finally {// 如果添加失敗,就需要做一件事,就是遞減實際工作線程數// (還記得我們最開始的時候增加了工作線程數嗎)if (! workerStarted)addWorkerFailed(w);}// 返回結果return workerStarted;}

addWorker方法 執行流程

1、獲取線程池的控制狀態,進行判斷,不符合則返回false,符合則下一步

2、死循環,判斷workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,沒有的話則對workerCount+1操作,

3、如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態,獲取runState與剛開始獲取的runState相比,不一致則跳出內層循環繼續外層循環,否則繼續內層循環

4、+1操作成功后,使用重入鎖ReentrantLock來保證往workers當中添加worker實例,添加成功就啟動該實例。

Worker內部類

可以發現 addWorker 方法只是構造了一個 Worker,并且把 firstTask 封裝到 worker 中,它是做什么的呢?我們來瞧瞧

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** * Thread this worker is running in. Null if factory fails. * 此工作線程正在運行的線程。如果工廠失敗,則為null。*/// 這才是真正執行 task 的線程,從構造函數可知是由ThreadFactury 創建的final Thread thread;/** * Initial task to run. Possibly null. * 要運行的初始任務。可能為空*/// 這就是需要執行的 taskRunnable firstTask;/** * Per-thread task counter * 每線程任務計數器*/// 完成的任務數,用于線程池統計volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*//*** 創建并初始化第一個任務,使用線程工廠來創建線程* 初始化有3步* 1、設置AQS的同步狀態為-1,表示該對象需要被喚醒* 2、初始化第一個任務* 3、調用ThreadFactory來使自身創建一個線程,并賦值給worker的成員變量thread*/Worker(Runnable firstTask) {// 初始狀態 -1,防止在調用 runWorker(),也就是真正執行 task前中斷 thread。setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 創建一個線程this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */// 重寫Runnable的run方法public void run() {// 調用ThreadPoolExecutor的runWorker方法runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.// 代表是否獨占鎖,0-非獨占 1-獨占protected boolean isHeldExclusively() {return getState() != 0;}// 重寫AQS的tryAcquire方法嘗試獲取鎖protected boolean tryAcquire(int unused) {// 嘗試將AQS的同步狀態從0改為1if (compareAndSetState(0, 1)) {// 如果改變成,則將當前獨占模式的線程設置為當前線程并返回truesetExclusiveOwnerThread(Thread.currentThread());// 成功獲得鎖return true;}// 線程進入等待隊列return false;}// 重寫AQS的tryRelease嘗試釋放鎖protected boolean tryRelease(int unused) {// 設置當前獨占模式的線程為nullsetExclusiveOwnerThread(null);// 設置AQS同步狀態為0setState(0);// 返回truereturn true;}// 獲取鎖public void lock() { acquire(1); }// 嘗試獲取鎖public boolean tryLock() { return tryAcquire(1); }// 釋放鎖public void unlock() { release(1); }// 是否被獨占public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

addWorkerFailed方法

addWorker方法添加worker失敗,并且沒有成功啟動任務的時候,就會調用此方法,將任務從workers中移除,并且workerCount做-1操作。

private void addWorkerFailed(Worker w) {// 重入鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 如果worker不為nullif (w != null)// workers移除workerworkers.remove(w);// 通過CAS操作,workerCount-1decrementWorkerCount();tryTerminate();} finally {// 釋放鎖mainLock.unlock();}}

tryTerminate方法

當對線程池執行了非正常成功邏輯的操作時,都會需要執行tryTerminate嘗試終止線程池

final void tryTerminate() {// 死循環for (;;) {// 獲取線程池控制狀態int c = ctl.get();/** 線程池處于RUNNING狀態* 線程池狀態最小大于TIDYING* 線程池==SHUTDOWN并且workQUeue不為空* 直接return,不能終止*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果workerCount不為0if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 獲取線程池的鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 通過CAS操作,設置線程池狀態為TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {// 設置線程池的狀態為TERMINATEDctl.set(ctlOf(TERMINATED, 0));// 發送釋放信號給在termination條件上等待的線程termination.signalAll();}return;}} finally {// 釋放鎖mainLock.unlock();}// else retry on failed CAS}}

runWorker方法

該方法的作用就是去執行任務

final void runWorker(Worker w) {// 獲取當前線程Thread wt = Thread.currentThread();// 獲取worker里的任務Runnable task = w.firstTask;// 將worker實例的任務賦值為nullw.firstTask = null;/*** unlock方法會調用AQS的release方法* release方法會調用具體實現類也就是Worker的tryRelease方法* 也就是將AQS狀態置為0,允許中斷*/w.unlock(); // allow interrupts// 是否突然完成boolean completedAbruptly = true;try {// worker實例的task不為空,或者通過getTask獲取的不為空while (task != null || (task = getTask()) != null) {// 獲取鎖w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt/*** 獲取線程池的控制狀態,至少要大于STOP狀態* 如果狀態不對,檢查當前線程是否中斷并清除中斷狀態,并且再次檢查線程池狀態是否大于STOP* 如果上述滿足,檢查該對象是否處于中斷狀態,不清除中斷標記*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中斷改對象wt.interrupt();try {// 執行前的方法,由子類具體實現beforeExecute(wt, task);Throwable thrown = null;try {// 執行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 執行完后調用的方法,也是由子類具體實現afterExecute(task, thrown);}} finally { // 執行完后// task設置為nulltask = null;// 已完成任務數+1w.completedTasks++;// 釋放鎖w.unlock();}}completedAbruptly = false;} finally {// 處理并退出當前workerprocessWorkerExit(w, completedAbruptly);}}

runWorker方法 執行流程

1,首先在方法一進來,就執行了w.unlock(),這是為了將AQS的狀態改為0,因為只有getState() >= 0的時候,線程才可以被中斷;

2,判斷firstTask是否為空,為空則通過getTask()獲取任務,不為空接著往下執行

3,判斷是否符合中斷狀態,符合的話設置中斷標記

4,執行beforeExecute(),task.run(),afterExecute()方法

5,任何一個出異常都會導致任務執行的終止;進入processWorkerExit來退出任務

6,正常執行的話會接著回到步驟2

getTask方法

在上面的runWorker方法當中我們可以看出,當firstTask為空的時候,會通過該方法來接著獲取任務去執行,那我們就看看獲取任務這個方法到底是怎么樣的?

private Runnable getTask() {// 標志是否獲取任務超時boolean timedOut = false; // Did the last poll() time out?for (;;) {// 獲取線程池的控制狀態int c = ctl.get();// 獲取線程池的runStateint rs = runStateOf(c);// Check if queue empty only if necessary./*** 判斷線程池的狀態,出現以下兩種情況* 1、runState大于等于SHUTDOWN狀態* 2、runState大于等于STOP或者阻塞隊列為空* 將會通過CAS操作,進行workerCount-1并返回null*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 獲取線程池的workerCountint wc = workerCountOf(c);// Are workers subject to culling?/*** allowCoreThreadTimeOut:是否允許core Thread超時,默認false* workerCount是否大于核心核心線程池*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 1、wc大于maximumPoolSize或者已超時* 2、隊列不為空時保證至少有一個任務*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {/*** 通過CAS操作,workerCount-1* 能進行-1操作,證明wc大于maximumPoolSize或者已經超時*/if (compareAndDecrementWorkerCount(c))// -1操作成功,返回nullreturn null;// -1操作失敗,繼續循環continue;}try {/*** wc大于核心線程池* 執行poll方法* 小于核心線程池* 執行take方法*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 判斷任務不為空返回任務if (r != null)return r;// 獲取一段時間沒有獲取到,獲取超時timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

processWorkerExit方法

明顯的,在執行任務當中,會去獲取任務進行執行,那既然是執行任務,肯定就會有執行完或者出現異常中斷執行的時候,那這時候肯定也會有相對應的操作,至于具體操作是怎么樣的,我們還是直接去看源碼最實際。

private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** completedAbruptly:在runWorker出現,代表是否突然完成的意思* 也就是在執行任務過程當中出現異常,就會突然完成,傳true** 如果是突然完成,需要通過CAS操作,workerCount-1* 不是突然完成,則不需要-1,因為getTask方法當中已經-1** 下面的代碼注釋貌似與代碼意思相反了*/if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();// 生成重入鎖final ReentrantLock mainLock = this.mainLock;// 獲取鎖mainLock.lock();try {// 線程池統計的完成任務數completedTaskCount加上worker當中完成的任務數completedTaskCount += w.completedTasks;// 從HashSet<Worker>中移除workers.remove(w);} finally {// 釋放鎖mainLock.unlock();}// 因為上述操作是釋放任務或線程,所以會判斷線程池狀態,嘗試終止線程池tryTerminate();// 獲取線程池的控制狀態int c = ctl.get();// 判斷runState是否小于STOP,即是RUNNING或者SHUTDOWN// 如果是RUNNING或者SHUTDOWN,代表沒有成功終止線程池if (runStateLessThan(c, STOP)) {/** 是否突然完成* 如若不是,代表已經沒有任務可獲取完成,因為getTask當中是while循環*/if (!completedAbruptly) {/*** allowCoreThreadTimeOut:是否允許core thread超時,默認false* min-默認是corePoolSize*/int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允許core thread超時并且隊列不為空// min為0,即允許core thread超時,這樣就不需要維護核心核心線程池了// 如果workQueue不為空,則至少保持一個線程存活if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果workerCount大于min,則表示滿足所需,可以直接返回if (workerCountOf(c) >= min)return; // replacement not needed}// 如果是突然完成,添加一個空任務的worker線程addWorker(null, false);}}

總結

在文章的最后,我們先對 ThreadPoolExecutor 的關鍵信息做一些總結:

線程池解決兩個問題:

  • 通過減少任務間的調度開銷 (主要是通過線程池中的線程被重復使用的方式),來提高大量任務時的執行性能
  • 提供了一種方式來管理線程和消費,維護基本數據統計等工作,比如統計已完成的任務數;

線程池容量相關參數:

  • coreSize:當新任務提交時,發現運行的線程數小于 coreSize,一個新的線程將被創建,即使這時候其它工作線程是空閑的,可以通過 getCorePoolSize 方法獲得 coreSize
  • maxSize: 當任務提交時,coreSize < 運行線程數 <= maxSize,但隊列沒有滿時,任務提交到隊列中,如果隊列滿了,在 maxSize 允許的范圍內新建線程;
  • 一般來說,coreSize 和 maxSize 在線程池初始化時就已經設定了,但我們也可以通過 setCorePoolSize、setMaximumPoolSize 方法動態的修改這兩個值;

Keep-alive times 參數:

  • 作用: 如果當前線程池中有超過 coreSize 的線程,并且線程空閑的時間超過 keepAliveTime,當前線程就會被回收,這樣可以避免線程沒有被使用時的資源浪費;
  • 通過 setKeepAliveTime 方法可以動態的設置 keepAliveTime 的值;
  • 如果設置 allowCoreThreadTimeOut 為 ture 的話,core thread 空閑時間超過 keepAliveTime 的話,也會被回收;

線程池新建時的隊列選擇有很多,比如:

  • ArrayBlockingQueue,有界隊列,可以防止資源被耗盡;
  • LinkedBlockingQueue,無界隊列,未消費的任務可以在隊列中等待
  • SynchronousQueue,為了避免任務被拒絕,要求線程池的 maxSize 無界,缺點是當任務提交的速度超過消費的速度時,可能出現無限制的線程增長

拒絕策略:在 Executor 已經關閉或對最大線程和最大隊列都使用飽和時,可以使用 RejectedExecutionHandler 類進行異常捕捉。有如下四種處理策略:

  • AbortPolicy(默認):拋出異常
  • CallerRunsPolicy:不使用線程池,主線程來執行
  • DiscardPolicy:直接丟棄任務
  • DiscardOldestPolicy:丟棄隊列中最老任務

ExecutorService 使用線程池中的線程執行提交的任務,線程池我們可以使用 Executors 進行配置.Executors 為常用的場景設定了可直接初始化線程池的方法,比如:

  • Executors#newCachedThreadPool 無界的線程池,并且可以自動回收
  • Executors#newFixedThreadPool 固定大小線程池
  • Executors#newSingleThreadExecutor 單個線程的線程池;

另外,線程池提供了很多可供擴展的鉤子函數,比如有:

  • 提供在每個任務執行之前 beforeExecute 和執行之后 afterExecute 的鉤子方法,主要用于操作執行環境,比如初始化 ThreadLocals、收集統計數據、添加日志條目等
  • 如果在執行器執行完成之后想干一些事情,可以實現 terminated 方法,如果鉤子方法執行時發生異常,工作線程可能會失敗并立即終止。

總結

以上是生活随笔為你收集整理的【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。