日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java多线程系列:ThreadPoolExecutor源码分析

發布時間:2023/12/31 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java多线程系列:ThreadPoolExecutor源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

這篇主要講述ThreadPoolExecutor的源碼分析,貫穿類的創建、任務的添加到線程池的關閉整個流程,讓你知其然所以然。希望你可以通過本篇博文知道ThreadPoolExecutor是怎么添加任務、執行任務的,以及延伸的知識點。那么先來看看ThreadPoolExecutor的繼承關系吧。

繼承關系

Executor接口

public interface Executor {void execute(Runnable command); }

Executor接口只有一個方法execute,傳入線程任務參數

ExecutorService接口

public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }

ExecutorService接口繼承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。

AbstractExecutorService抽象類

public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {... }public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {...}}

AbstractExecutorService抽象類實現ExecutorService接口,并且提供了一些方法的默認實現,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、線程池的關閉方法(shutdown、shutdownNow等等)就沒有提供默認的實現。

ThreadPoolExecutor

先介紹下ThreadPoolExecutor線程池的狀態吧

線程池狀態

int 是4個字節,也就是32位(注:一個字節等于8位)

//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,后29位表示線程數量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程數量統計位數29 Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3; //容量 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//運行中 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //關閉 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //整理 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //終止 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;//獲取運行狀態(獲取前3位) private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取線程個數(獲取后29位) private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
  • RUNNING:接受新任務并且處理阻塞隊列里的任務
  • SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務
  • STOP:拒絕新任務并且拋棄阻塞隊列里的任務同時會中斷正在處理的任務
  • TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為0,將要調用terminated方法
  • TERMINATED:終止狀態。terminated方法調用完成以后的狀態

線程池狀態轉換

RUNNING -> SHUTDOWN顯式調用shutdown()方法, 或者隱式調用了finalize()方法 (RUNNING or SHUTDOWN) -> STOP顯式調用shutdownNow()方法 SHUTDOWN -> TIDYING當線程池和任務隊列都為空的時候 STOP -> TIDYING當線程池為空的時候 TIDYING -> TERMINATED當 terminated() hook 方法執行完成時候

構造函數

有四個構造函數,其他三個都是調用下面代碼中的這個構造函數

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { }

參數介紹

參數類型含義
corePoolSizeint核心線程數
maximumPoolSizeint最大線程數
keepAliveTimelong存活時間
unitTimeUnit時間單位
workQueueBlockingQueue存放線程的隊列
threadFactoryThreadFactory創建線程的工廠
handlerRejectedExecutionHandler多余的的線程處理器(拒絕策略)

提交任務

submit

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask; }public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask; }public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask; }

流程步驟如下

  • 調用submit方法,傳入Runnable或者Callable對象
  • 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  • 將傳入的對象轉換為RunnableFuture對象
  • 執行execute方法,傳入RunnableFuture對象
  • 返回RunnableFuture對象
  • 流程圖如下

    execute

    public void execute(Runnable command) {//傳進來的線程為null,則拋出空指針異常if (command == null)throw new NullPointerException();//獲取當前線程池的狀態+線程個數變量int c = ctl.get();/*** 3個步驟*///1.判斷當前線程池線程個數是否小于corePoolSize,小于則調用addWorker方法創建新線程運行,且傳進來的Runnable當做第一個任務執行。//如果調用addWorker方法返回false,則直接返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//2.如果線程池處于RUNNING狀態,則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {//二次檢查int recheck = ctl.get();//如果當前線程池狀態不是RUNNING則從隊列刪除任務,并執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);//否者如果當前線程池線程空,則添加一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//3.新增線程,新增失敗則執行拒絕策略else if (!addWorker(command, false))reject(command); }

    其實從上面代碼注釋中可以看出就三個判斷,

  • 核心線程數是否已滿
  • 隊列是否已滿
  • 線程池是否已滿
  • 然后根據這三個條件進行不同的操作,下圖是Java并發編程的藝術書中的線程池的主要處理流程,或許會比較容易理解些

    下面是整個流程的詳細步驟

  • 調用execute方法,傳入Runable對象
  • 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  • 獲取當前線程池的狀態和線程個數變量
  • 判斷當前線程數是否小于核心線程數,是走流程5,否則走流程6
  • 添加線程數,添加成功則結束,失敗則重新獲取當前線程池的狀態和線程個數變量,
  • 判斷線程池是否處于RUNNING狀態,是則添加任務到阻塞隊列,否則走流程10,添加任務成功則繼續流程7
  • 重新獲取當前線程池的狀態和線程個數變量
  • 重新檢查線程池狀態,不是運行狀態則移除之前添加的任務,有一個false走流程9,都為true則走流程11
  • 檢查線程池線程數量是否為0,否則結束流程,是調用addWorker(null, false),然后結束
  • 調用!addWorker(command, false),為true走流程11,false則結束
  • 調用拒絕策略reject(command),結束
  • 可能看上面會有點繞,不清楚的可以看下面的流程圖

     addWorker
    private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 檢查當前線程池狀態是否是SHUTDOWN、STOP、TIDYING或者TERMINATED// 且!(當前狀態為SHUTDOWN、且傳入的任務為null,且隊列不為null)// 條件都成立則返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//循環for (;;) {int wc = workerCountOf(c);//如果當前的線程數量超過最大容量或者大于(根據傳入的core決定是核心線程數還是最大線程數)核心線程數 || 最大線程數,則返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS增加c,成功則跳出retryif (compareAndIncrementWorkerCount(c))break retry;//CAS失敗執行下面方法,查看當前線程數是否變化,變化則繼續retry循環,沒變化則繼續內部循環c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}//CAS成功boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一個線程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新檢查線程池狀態//避免ThreadFactory退出故障或者在鎖獲取前線程池被關閉int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 先檢查線程是否是可啟動的throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//判斷worker是否添加成功,成功則啟動線程,然后將workerStarted設置為trueif (workerAdded) {t.start();workerStarted = true;}}} finally {//判斷線程有沒有啟動成功,沒有則調用addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}return workerStarted; }

    這里可以將addWorker分為兩部分,第一部分增加線程池個數,第二部分是將任務添加到workder里面并執行。

    第一部分主要是兩個循環,外層循環主要是判斷線程池狀態,下面描述來自Java中線程池ThreadPoolExecutor原理探究

    rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())

    展開!運算后等價于

    s >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())

    也就是說下面幾種情況下會返回false:

    • 當前線程池狀態為STOP,TIDYING,TERMINATED
    • 當前線程池狀態為SHUTDOWN并且已經有了第一個任務
    • 當前線程池狀態為SHUTDOWN并且任務隊列為空

    內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。

    到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這里使用全局的獨占鎖來控制workers里面添加任務,其實也可以使用并發安全的set,但是性能沒有獨占鎖好(這個從注釋中知道的)。這里需要注意的是要在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。

    所以這里也將流程圖分為兩部分來描述

    第一部分流程圖

    第二部分流程圖

    Worker對象

    Worker是定義在ThreadPoolExecutor中的finnal類,其中繼承了AbstractQueuedSynchronizer類和實現Runnable接口,其中的run方法如下

    public void run() {runWorker(this); }

    線程啟動時調用了runWorker方法,關于類的其他方面這里就不在敘述。

    runWorker

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//循環獲取任務while (task != null || (task = getTask()) != null) {w.lock();// 當線程池是處于STOP狀態或者TIDYING、TERMINATED狀態時,設置當前線程處于中斷狀態// 如果不是,當前線程就處于RUNNING或者SHUTDOWN狀態,確保當前線程不處于中斷狀態// 重新檢查當前線程池的狀態是否大于等于STOP狀態if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//提供給繼承類使用做一些統計之類的事情,在線程運行前調用beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//提供給繼承類使用做一些統計之類的事情,在線程運行之后調用afterExecute(task, thrown);}} finally {task = null;//統計當前worker完成了多少個任務w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//整個線程結束時調用,線程退出操作。統計整個線程池完成的任務個數之類的工作processWorkerExit(w, completedAbruptly);} }

    getTask

    getTask方法的主要作用其實從方法名就可以看出來了,就是獲取任務

    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//循環for (;;) {int c = ctl.get();int rs = runStateOf(c);//線程線程池狀態和隊列是否為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//線程數量int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(當前線程數是否大于最大線程數或者)//且(線程數大于1或者任務隊列為空)//這里有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//獲取任務Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }

    關閉線程池

    shutdown

    當調用shutdown方法時,線程池將不會再接收新的任務,然后將先前放在隊列中的任務執行完成。

    下面是shutdown方法的源碼

    public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }

    shutdownNow

    立即停止所有的執行任務,并將隊列中的任務返回

    public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }

    shutdown和shutdownNow區別

    shutdown和shutdownNow這兩個方法的作用都是關閉線程池,流程大致相同,只有幾個步驟不同,如下

  • 加鎖
  • 檢查關閉權限
  • CAS改變線程池狀態
  • 設置中斷標志(線程池不在接收任務,隊列任務會完成)/中斷當前執行的線程
  • 調用onShutdown方法(給子類提供的方法)/獲取隊列中的任務
  • 解鎖
  • 嘗試將線程池狀態變成終止狀態TERMINATED
  • 結束/返回隊列中的任務
  • 總結

    線程池可以給我們多線程編碼上提供極大便利,就好像數據庫連接池一樣,減少了線程的開銷,提供了線程的復用。而且ThreadPoolExecutor也提供了一些未實現的方法,供我們來使用,像beforeExecute、afterExecute等方法,我們可以通過這些方法來對線程進行進一步的管理和統計。

    在使用線程池上好需要注意,提交的線程任務可以分為CPU 密集型任務和IO 密集型任務,然后根據任務的不同進行分配不同的線程數量。

    • CPU密集型任務:
      • 應當分配較少的線程,比如 CPU個數相當的大小
    • IO 密集型任務:
      • 由于線程并不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數 * 2
    • 混合型任務:
      • 可以將其拆分為 CPU 密集型任務以及 IO 密集型任務,這樣來分別配置。

    好了,這篇博文到這里就結束了,文中可能會有些紕漏,歡迎留言指正。

    如果本文對你有所幫助,給個star唄,謝謝。本文GitHub地址:點這里點這里

    參考資料

  • 并發編程網-Java中線程池ThreadPoolExecutor原理探究
  • Java并發編程的藝術

  • 作者: 云梟zd
    Github: Github地址
    出處: https://www.cnblogs.com/fixzd/
    版權聲明:本文歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則視為侵權。

    總結

    以上是生活随笔為你收集整理的java多线程系列:ThreadPoolExecutor源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。