c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析
推薦學習
- 被微服務轟炸?莫怕!耗時35天整出的「微服務學習教程」送你
- 死磕「并發編程」100天,全靠阿里大牛的這份最全「高并發套餐」
- 閉關28天,奉上[Java一線大廠高崗面試題解析合集],備戰金九銀十
前言
或許每個Java工程師都被問過這樣一個問題
Java中開啟一個新的線程有幾種方法?
繼承Thread類和實現Runnable接口。但是除了寫Demo,幾乎沒人會在生產環境上這樣用。具體原因如下:
- 線程頻繁的被創建、銷毀,非常消耗資源
- 這兩種方式開啟的線程都不便于統一的調度和管理
- HotSpot虛擬機采用1:1的模型來實現Java線程的,也就是說一個Java線程直接通過一個操作系統線程來實現,如果可以無限制的開啟線程,很容易導致操作系統資源耗盡。
線程池
繼承Thread和實現Runnable的諸多缺點,所以生產環境必須使用線程池來實現多線程。
線程池(thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決于可用的并發處理器、處理器內核、內存、網絡sockets等的數量。 ——維基百科
簡單來說,“池”在計算機領域是指集合,線程池就是指線程集合。線程池可以對一系列線程的生命周期進行統一的調度和管理,包括線程的創建、消亡、生存時間、數量控制等。
Java中的線程池從JDK1.5開始,有一個標準的實現java.util.concurrent.ThreadPoolExecutor,對于這個類,首先看下它的體系結構圖
- Executor:只定義了一個方法execute,用于執行提交的任務
- ExecutorService:定義了一些線程池管理、任務提交、線程池檢測的方法
- AbstractExecutorService:提供了ExecutorService接口執行方法的默認實現,用于統一處理Callable任務和Runnable任務
內部結構
這里主要關注類的定義和一些重要的常量、成員變量
public class ThreadPoolExecutor extends AbstractExecutorService {// 高3位表示線程池狀態,低29位表示worker數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29 = 32 - 3 private static final int COUNT_BITS = Integer.SIZE - 3; // 線程池允許的最大線程數。為 2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 線程池有5種狀態,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 獲取線程池狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取線程池worker數量 private static int workerCountOf(int c) { return c & CAPACITY; } // 根據線程池狀態和worker數量生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; } // 緩沖隊列(阻塞隊列) private final BlockingQueue workQueue; // 互斥鎖 private final ReentrantLock mainLock = new ReentrantLock(); // 包含線程池工作的所以線程,僅在持有mainLock的時候能訪問 private final HashSet workers = new HashSet(); private final Condition termination = mainLock.newCondition(); // 跟蹤線程池最大的大小(實際的最大值),僅在持有mainLock的時候能訪問 private int largestPoolSize; // 記錄已經完成的任務數,僅在工作線程終止時更新,僅在持有mainLock的時候能訪問 private long completedTaskCount; // 線程工廠 private volatile ThreadFactory threadFactory; // 線程池飽和或者關閉時的執行器 private volatile RejectedExecutionHandler handler; // 空閑線程等待工作的超時時間 private volatile long keepAliveTime; // 如果為false(默認值),核心線程永遠不回收 // 如果為true,核心線程也通過keepAliveTime參數超時回收 private volatile boolean allowCoreThreadTimeOut; // 核心線程數 private volatile int corePoolSize; // 最大線程數(程序設置的最大線程數,區別于largestPoolSize) private volatile int maximumPoolSize; // 默認的拒絕策略處理器,拋出RejectedExecutionException異常 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();}涉及到的成員變量、常量比較多,也不太容易理解,不過看完整篇后再來回顧這里,就很容易理解了。
生命周期
ThreadPoolExecutor類提供了線程池的五個狀態描述
// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;這幾種狀態之間的轉換過程如下
- RUNNING:運行狀態,可以執行任務,也可以接受阻塞隊列里的任務調度
- SHUTDOWN:調用了shutdown()方法,該狀態可以繼續執行阻塞隊列中的任務,但是不會再接受新任務
- STOP:調用了shutdownNow()方法,該狀態會嘗試中斷正在執行的所有任務,不能繼續執行阻塞隊列中的任務,也不會再接受新任務
- TIDYING:所有任務都執行完畢,至于阻塞隊列中的任務是否執行完成,取決于調用了shutdown()還是shutdownNow()方法
- TERMINATED:terminated()方法執行完成后進入該狀態,terminated()方法默認沒有任何操作
構造方法
ThreadPoolExecutor提供了四個構造方法,忽略它提供的語法糖,我們直接看最吊的那個構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) // corePoolSize、maximumPoolSize、keepAliveTime都不能小于0 // 且maximumPoolSize必須大于等于corePoolSize throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) // workQueue、threadFactory、handler均不能為null throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}這個構造方法有七個參數,如果能明白各個參數的作用,那么線程池的工作原理也就基本清晰了。
- int corePoolSize:核心線程數,當有新的任務提交到線程池時,會進行如下判斷:線程池中線程數量小于corePoolSize時,會創建新線程處理任務,即使還有其他空閑的核心線程線程池中線程數量等于corePoolSize時,任務會加入到workQueue緩存隊列,直到緩存隊列滿了,才會新建非核心線程去處理任務線程池中的線程數量等于maximumPoolSize且緩存隊列已滿時,會根據RejectedExecutionHandler參數指定的拒絕策略來處理提交的任務如果corePoolSize和maximumPoolSize相等,則創建的線程池大小是固定的,緩存隊列滿了就執行決絕策略
- int maximumPoolSize:最大線程數
- long keepAliveTime:非核心線程的最長空閑時間,超過了會被回收(allowCoreThreadTimeOut參數設置成true,也會回收核心線程)
- TimeUnit unit:keepAliveTime參數的單位
- BlockingQueue workQueue:阻塞隊列,用于緩存,保存正在等待執行的任務。一般有以下幾種配置直接切換:常用的隊列是SynchronousQueue無界隊列:常用的隊列是LinkedBlockingQueue,隊列基于鏈表實現,最大長度是Integer.MAX_VALUE,雖然是有界的,但是值太大,所以認為是無界隊列。使用無界隊列可能會導致最大線程數maximumPoolSize失效,這點結合下文的線程池執行過程會很容易理解有界隊列:常用的隊列是ArrayBlockingQueue,基于數組實現,能把最大線程數控制為maximumPoolSize。也能避免阻塞隊列中堆積的任務過多。
- ThreadFactory threadFactory:線程Factory,用來創建線程。使用默認的ThreadFactory創建的線程是具有相同優先級的非守護線程。一般需要自定義ThreadFactory,因為要給每個線程設置有意義的名稱。
- RejectedExecutionHandler handler: 當線程數達到了最大線程數,且沒有線程空閑,且緩沖隊列也滿了(也就是線程池飽和了),指定拒絕策略,ThreadPoolExecutor自身提供了四種拒絕策略:AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常CallerRunsPolicy:利用調用者所在的線程執行任務,哪個線程提交這個任務,就由哪個線程執行DiscardOldestPolicy:丟棄緩存隊列中頭部的任務,重試提交的任務DiscardPolicy:直接丟棄顯然默認的四種拒絕策略都不能很好的使用在生產環境,所以一般也需要自定義拒絕策略來處理飽和的任務。將暫時無法處理的任務存入中間件、數據庫以及日志記錄。
線程池中線程的數量并不是越多越好,因為服務器的性能總是有限的。線程數過多會增加線程切換的開銷,并且空閑線程的頻繁回收也需要消耗資源。線程池的七個參數相輔相成,相互影響,設置的時候需要根據實際情況酌情考慮。
看文字描述多少有些不清晰,如果能有張圖的話就再好不過了。你就說巧不巧吧,剛好我畫了一張圖。
對照這張圖和上面的描述,相信大家對ThreadPoolExecutor的七個參數有個深刻的認識。也很容易理解為什么使用無界隊列LinkedBlockingQueue會使maximumPoolSize失效了,因為緩存隊列可能永遠不會滿。
核心方法
毫無疑問,線程池最核心的方法除了構造方法,就是執行task的方法了。在看ThreadPoolExecutor的核心方法之前,先看一個非常非常重要的內部類Worker,它是線程池中運行任務的最小單元。
// 繼承了AbstractQueuedSynchronizer,是一把鎖// 實現了Runnable接口,是一個線程執行的taskprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; /** 運行任務的線程 */ final Thread thread; /** 要運行的初始任務,可能為null */ Runnable firstTask; /** 每個線程的任務計數器 */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 把自己作為一個任務傳遞給ThreadFactory創建的線程 this.thread = getThreadFactory().newThread(this); } /** runWorker是一個非常重要的方法,后文詳細介紹 */ public void run() { runWorker(this); } // 值為0代表解鎖狀態 // 值為1表示鎖定狀態 protected boolean isHeldExclusively() { return getState() != 0; }// CAS的方式嘗試加鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }// 嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}Worker類實現了Runnable接口,所以本身就是一個可執行的任務,并且在構造方法中將自己傳遞給ThreadFactory創建的線程去執行
Worker類繼承了AbstractQueuedSynchronizer類,所以它本身也是一把鎖,執行任務的時候鎖住自己,任務執行完成后解鎖。
了解了Worker類,再來看核心方法。
execute
execute方法用于在將來的某個時間執行指定的任務,execute方法源碼比較復雜,應該先理清楚整體邏輯,在逐步深入細節。
public void execute(Runnable command) { if (command == null) // 提交空任務,直接拋異常 throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // worker數量小于核心線程數,創建核心線程執行任務(第二個參數為true,表示創建核心線程) // addWorker方法會檢查線程池的狀態 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // worker數量超過核心線程數,進入緩沖隊列 // 再次獲取ctl值,因為從上次獲取到這里,有可能ctl的值已經被改變,double-check int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 線程池不是RUNNING狀態,說明已經調用過shutdown方法,需要對新提交的任務執行拒絕策略 reject(command); else if (workerCountOf(recheck) == 0) // 因為構造方法中corePoolSize可能為0或者核心線程也都被回收了,所以此處需要判斷 addWorker(null, false); } else if (!addWorker(command, false)) // 線程池不是RUNNING狀態,或者任務加入緩沖隊列失敗,創建非核心線程執行任務(第二個參數為false) // 任務執行失敗,需要執行拒絕策略 reject(command);}整體邏輯就是前文所示的流程圖。相信有了流程圖的對比,execute方法的理解就容易多了。
addWorker
addWorker方法用于往線程池添加新的worker。其實現如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 這種寫法叫做label語法,一般用于多重性循環中跳轉到指定位置 for (;;) { // 外層自旋 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 線程池狀態 >= SHUTDOWN if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 內層自旋 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 工作中的線程數大于線程池的容量,或者已經大于等于核心線程數,或者大于等于最大線程數 // core為true,表示要創建核心線程,false表示要創建非核心線程 // 為什么大于等核心線程數的時候要返回false,因為要添加到緩沖隊列,或者創建非核心線程來執行,不能創建核心線程了 return false; if (compareAndIncrementWorkerCount(c)) // 以CAS的方式嘗試把線程數加1 // 注意這里只是把線程池中的線程數加1,并沒有在線程池中真正的創建線程 // 成功后跳出內層自旋 break retry; // CAS失敗,再次獲取ctl,檢查線程池狀態 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 線程池狀態被改變了,從外層自旋開始再次執行之前的邏輯 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 可以看到兩層自旋 + CAS,僅僅是為了把線程池中的線程數加1,還沒有新建線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 把task包裝成Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 獲取鎖之后,再次檢查線程池的狀態 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // 檢查線程狀態 throw new IllegalThreadStateException(); // 添加到worders workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 維護largestPoolSize變量 largestPoolSize = s; workerAdded = true; } } finally { // 解鎖 mainLock.unlock(); } if (workerAdded) { // 添加成功 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 執行worker的線程啟動失敗 addWorkerFailed(w); } return workerStarted;}可以看到addWorker方法前一部分,用了外層自旋判斷線程池的狀態,內層自旋 + CAS給線程池中的線程數加1。后半部分用了ReentrantLock保證創建Worker對象,以及啟動線程的線程安全。一個方法中三次獲取了線程池的狀態(不包含該方法調用的其他方法),因為每兩次之間,線程池的狀態都有可能被改變。
runWorker
前文在介紹Worker內部類時說過,Worker會把自己傳遞給ThreadFactory創建的線程執行,最終執行Worker的run方法,而Worker類的run方法只有一行代碼:
runWorker(this);所以接下來看看runWorker方法是如何實現了
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 允許外部中斷 w.unlock(); // allow interrupts // 記錄worker是不是異常退出的 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 自旋,如果task不為空,或者能從緩沖隊列(阻塞隊列)中獲取任務就繼續執行,不能就一直阻塞 // 加鎖 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 如果線程池正在停止,并且當前線程沒有被中斷,就中斷當前線程 wt.interrupt(); try { // 鉤子函數,處理task執行前的邏輯 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子函數,處理task執行后的邏輯 afterExecute(task, thrown); } } finally { task = null; // 完成的任務數加1 w.completedTasks++; // 解鎖 w.unlock(); } } // 運行到這里,說明worker沒有異常退出 completedAbruptly = false; } finally { // 自旋操作被打斷了,說明線程需要被回收 processWorkerExit(w, completedAbruptly); }}第10行代碼中,task為null時,會通過getTask()方法從緩沖隊列中取任務,因為緩沖隊列是阻塞隊列,所以如果獲取不到任務會一直被阻塞,接下來看看getTask方法的內部實現
getTask
getTask用于阻塞式的從緩沖隊列中獲取任務。
private Runnable getTask() {// 線程是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { // 自旋 // 獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 線程池終止了,或者線程池停止了,且緩沖隊列中沒有任務了 // 自旋 + CAS方式減少線程計數 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 根據allowCoreThreadTimeOut參數來判斷,要不要給核心線程設置等待超時時間 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 當前線程數大于了maximumPoolSize(因為maximumPoolSize可以動態修改)或者當前線程設置了超時時間且已經超時了 // 且線程數大于1或者緩沖隊列為空 // 這個條件的意思就是:當前線程需要被回收 if (compareAndDecrementWorkerCount(c)) // 返回null后,上層runWorker方法中斷循環,執行processWorkerExit方法回收線程 return null; continue; } try { // 從阻塞隊列中獲取任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 成功獲取任務 return r; // 沒有獲取到任務,超時 timedOut = true; } catch (InterruptedException retry) { // 線程被中斷,重試 timedOut = false; } }}理解該方法的前提,是要理解阻塞隊列提供的阻塞式API。
這個方法重點關注兩點:
- 從緩沖隊列取任務時,poll非阻塞,take阻塞,調用哪個由當前線程需不需要被回收來決定
- 該方法返回null之后,上層方法會回收當前線程
除了這幾個核心方法之外,往線程池提交任務還有一個方法叫submit
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask;}public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask;}public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}submit方法可以接收線程池返回的結果,也就是Futrue對象,可以接收Runnable對象和Callable對象。
至于Future、FutureTask、Runnable、Callable之間的關系,博主在前一篇博客 如何獲取子線程的執行結果 已經詳細介紹過,此處不再贅述。
至此ThreadPoolExecutor的核心方法的源碼以及執行邏輯已經講解完畢,再來看一些非核心方法,了解一下即可
- public void shutdown():關閉線程池,已經提交過的任務還會執行(線程池中未運行完畢的,緩沖隊列中的)
- public List shutdownNow():停止線程池,試圖停止正在執行的任務,暫停緩沖隊列中的任務,并且返回
- public void allowCoreThreadTimeOut(boolean value):設置核心線程是否允許回收
- protected void beforeExecute(Thread t, Runnable r):鉤子函數,處理線程執行任務前的邏輯,這里是空實現
- protected void afterExecute(Runnable r, Throwable t):鉤子函數,處理線程執行任務后的邏輯,這里是空實現
- public int getActiveCount():返回正在執行任務的線程的大致數量
- public long getCompletedTaskCount():返回執行完成的任務的大致數量
除此之外還需要了解的是,構造方法中的七個參數,除了BlockingQueue是不能動態設置外,其余六個參數都可以動態設置,分別調用對于的setXxx方法即可,當然也可以通過對于的getXxx方法獲取對應的信息。
鑒于此,我們再來看一個常見的問題
Java有幾種線程池?
JDK(準確的說是java.util.concurrent.Executors工具類)提供了四種線程池:
- CachedThreadPool:緩沖線程池、
- FixedThreadPool:固定線程數的線程池
- SingleThreadExecutor:單線程的線程池
- ScheduledThreadPool:可定時調度的線程池
仔細看下這四種線程池,最終都調用了ThreadPoolExecutor的構造方法,只是傳遞的參數有所不同。
- CachedThreadPool和ScheculedThreadPool設置的最大線程數都是Integer.MAX_VALUE,可能線程數過多而產生OOM
- SingleThreadExecutor和FixedThreadPool使用的都是無界隊列,最大元素個數為Integer.MAX_VALUE,可能緩沖隊列中堆積的任務過多,而產生OOM
這兩點正是阿里巴巴代碼規范里禁止使用這四種線程池的原因。
想要使用線程池,必須通過ThreadPoolExecutor的方法來創建線程池。
總結
使用線程池需要注意的幾點如下:
- 合理設置七個參數
- 自定義ThreadFactory,給每個線程設置有意義的名稱
- 自定義RejectedExecutionHandler,處理線程池飽和時的邏輯
使用線程池之前一定要十分明確每個參數的意義以及對其他參數的影響,才能更加合理的使用線程池。
作者:Sicimike
原文鏈接:https://blog.csdn.net/Baisitao_/article/details/100415358
總結
以上是生活随笔為你收集整理的c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 内存泄漏的原因及解决办法_编程基础 |
- 下一篇: 世界大学城空间代码_C++中命名空间的五