Java 并发:Executor ExecutorService ThreadPoolExecutor
Executor
Executor僅僅是一個簡單的接口,其定義如下
public interface Executor {void execute(Runnable command); }作為一個簡單的線程池的話,實現這個接口就可以使用了。不過單單這樣的話,無法使用Future功能。
ExecutorService
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }ExecutorService接口擴展了Executor接口,加入了線程池生命周期的管理,還加入了Future功能。除了提交runnable外還可以使用Callable用于返回結果的任務。這里要注意execute和submit的區別,execute是用于實現Executor接口的,submit則提供了任務的Future機制,submit的實現是基于線程池execute基本功能的。實際上Future機制的大部分代碼都在FutureTask這個類里,反倒和線程池關系不大。
AbstractExecutorService
AbstractExecutorService實現了部分invoke系列接口和submit系列接口,它們都依賴子類實現的execute方法。這也說明實現線程池關鍵是提供管理其生命周期和執行任務的接口,至于submit提供的Future機制可以基于這些很快的實現,比如:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}ThreadPoolExecutor
ThreadPoolExecutor是JDK中主要線程池的一個實現,提供了多種不同的構造函數可以依據參數得到不同特性的線程池,它對于的工廠方法類為Executors。
組成成分
ThreadPoolExecutor從其工廠函數就可以大致看出各個組成成分,具體如下
任務隊列
通過execute提交的任務(submit操作最后也通過execute進行任務執行),會有可能先進入任務隊列而不是立即被線程池執行。這依賴于當前的線程池狀態和設定的參數,如果當前創建的線程數尚未達到corePoolSize那么會立即創建一個線程,否則則會嘗試加入隊列之中。
線程集合
作為一個線程池,它肯定需要創建線程,并保存這些線程的狀態信息。因為線程池內的線程是專門用來運行提交的Runnable活著Callable任務的,他們除了維護狀態信息外基本不會為自己干點什么,一般這樣的線程叫做worker,或工作者線程。在內部使用HashSet保存這些線程對象。
線程工廠
可以按照需要定制thread對象,比如設置線程池內線程名稱,調整daemon屬性等。
拒絕策略
如果線程池處理數量達到上限(隊列已滿且已有線程數達到maximumPoolSize)則開始拒絕任務,相當于提供了一個鉤子函數
池內線程
線程池內的線程除了運行用戶提交的任務外,還需要維護自己的一些狀態信息。這JDK的實現中工作者線程運行邏輯用一個實現了Runnable接口并繼承了AbstractQueuedSynchronizer的一個類Worker來表示。它沒有繼承Thread類,而只是實現了Runnable接口,具體創建線程的過程交給了用戶可以自己定制的ThreadFactory線程工廠。
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}線程生成
線程的生成均由addWorker這個函數進行,從線程池創建后,線程生成主要發生在幾種情況下:
- 提交了一個任務并且當前線程數量小于corePoolSize(不管是否有空閑著的線程)
- 提交了一個任務并且(a.沒有線程空閑 & b.任務隊列無法添加任務 & c.任務數小于設定的最大值)
- 調用了prestartAllCoreThreads,由于在創建后生成所有corePoolSize數量的線程
- 調用了prestartCoreThread,用于在創建后生成一個線程,如果數量已經達到corePoolSize則忽略
后面兩個方法可以方便的來預熱線程池。如上述給出的Worker構造函數可知它又一個參數叫做firstTask,這是因為一般情況下線程的創建都是因為有任務提交引起的(也就是說一個線程池創建后并不會馬上產生指定池大小數量的線程),firstTask是該Worker線程第一個運行的任務。當Worker線程運行完第一個任務后,它獲取新任務的方式就發生了改變,它會阻塞在任務隊列上,等待新任務的到來,firstTask基本就不再使用了。
為什么要采取這樣的方式?如果線程池是一個固定大小的,一創建后立即生成所有工作者線程的這樣的一種實現,就完全可以把任務放到隊列中,所有的Worker線程都從隊列里獲取要執行的任務。但JDK里實現支持動態的添加工作者線程,新創建的線程總是運行剛剛使得它被創建的那個任務提交。如果放到隊列里的話還要進行等待其他的任務先被執行。不過這么說也有些牽強,也未必后到得任務就更重要,反而讓前面排隊的任務等著。
線程運行
Worker對象實現了runnable接口由ThreadFactory給出的Thread對象負責正真的執行(Thread.start),然后再Worker的run方法中會去執行它接收到得任務,
public void run() {runWorker(this);}關鍵過程如runWorker函數:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}- task = getTask()就是從任務隊列取任務的過程,如果沒有任務的話會阻塞著,線程是處于WAITING狀態,也即空閑狀態
- task.run()即運行提交的任務
- w.completedTasks++每個worker都會統計自己運行的任務數,通過線程池可以獲得一個大概的總數,之所以是大概因為執行的任務時刻會完成,也沒必要用鎖去保證這個數字。
beforeExecute和afterExecute是線程池的鉤子函數可以被子類覆蓋用于實現一些統計功能,但要注意這些是會被線程池內不同線程執行,所以一般要用到threadlocal機制。
線程狀態
為什么Worker要繼承AbstractQueuedSynchronizer,因為它要維護自己的一個狀態變更過程,而且是要支持等待的,其實用一般的lock也可以,不過可能doug lea覺得沒必要再隔一層吧(lock也是用AQS實現的)。狀態值有下面幾個:- -1 Worker對象被創建但還沒有對應的Thread開始運行它,初始化時設置
- 0 已經有對應的Thread運行Worker的run方法,但沒有在運行Worker接收的任務內容,worker.unlock(),表示當前線程空閑
- 1 正在運行任務,worker.lock()
實際上用到worker.lock/tryLock的地方并不多,一個是在runWorker內部,一個就在interruptIdleWorkers這里。這里不得不提下shutdown方法它負責把線程池關掉,但是并不是很暴力,只是讓隊列停止接收任務,而讓已經執行的任務繼續直到所有已提交任務被完成。所以在這里要有選擇的interrupt線程,即選擇那些處于idle狀態的線程。而idle狀態的線程按照前面的設定就是狀態值為0/-1,即可以獲得lock的那些線程。
主要過程
任務提交
任務提交會導致新德工作者線程生成
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}線程退出
工作線程的退出有一下幾個原因:
線程池設定了allowCoreThreadTimeOut = true,并且獲取任務等待時長超過keepAliveTime
任務異常
當工作線程執行的任務拋出異常時,工作者線程會退出。當時在完全退出前會執行processWorkerExit(w, completedAbruptly);,有異常拋出時completedAbruptly為true,所以在該函數中如果發現當前工作者線程是因為異常而退出的會嘗試著再次執行一個addWorker調用來補上這個要退出的線程。可以發現如果此時線程池要關閉或者線程數量已經超過當前條件的最小值則不進行線程補充。這個最小值的產生很微妙。
線程池收縮
通過設定合適的keepAliveTime可以讓線程池多余corePoolSize的線程在一定時間后主動退出,實現線程池的動態收縮,如果設定了allowCoreThreadTimeOut = true的話連core線程也可以自動退出,直到一個線程都沒有,從getTask觀察得到:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}可以發現當需要自動收縮時通過帶有超時參數的poll函數去取得任務隊列(workQueue)內的任務,而一般情況下則使用take調用無限阻塞。
通過返回一個null值可以使得runWorker中的循環退出轉而執行processWorkerExit,注意在Worker線程完全退出前已經通過compareAndDecrementWorkerCount將當前Worker線程的數量給減少了,因為直到收到null后的工作線程循環肯定會馬上退出不再處理后續任務了,這也是為什么在前面processWorkerExit函數內要選擇性的進行計數減的原因。
shutdown
如前面提到的shutdown是一種溫和的關閉線程池的方式,它不會去interrupt已在運行任務的線程。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}它使用了interruptIdleWorkers去interrupt那些處于idle狀態的工作者線程。一旦線程中的任務響應了interrupt請求或主動退出或拋出InterruptedException都會使得工作者線程退出執行processWorkerExit方法并進而調用tryTerminate,使其在平時調用processWorkerExit時也會執行tryTerminate不過不必慌張,因為處于運行狀態的線程池后綴不做什么立即返回。
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}在(workerCountOf(c) != 0)時即還有任務運行時都會嘗試著去interrupt其中的空閑工作者線程(而這些線程退出又會執行tryTerminate方法,形成一個鏈式的傳遞)。而那些正在執行任務的工作者線程,雖然現在不能去中斷他們,但在在完成任務后它們會發現線程池已經處于要關閉的狀態也會主動退出。當所有的工作者線程都退出時執行termination.signalAll();喚醒在termination條件隊列上等的線程。一般是通過調用awaitTermination方法等待線程池完全退出。
轉載于:https://www.cnblogs.com/lailailai/p/4651930.html
總結
以上是生活随笔為你收集整理的Java 并发:Executor ExecutorService ThreadPoolExecutor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 白蚁药
- 下一篇: java美元兑换,(Java实现) 美元