日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java excutorthread_JAVA 线程池ThreadPoolExcutor原理探究

發布時間:2024/8/23 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java excutorthread_JAVA 线程池ThreadPoolExcutor原理探究 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概論

線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決于可用的并發處理器、處理器內核、內存、網絡 sockets 等的數量。 例如,線程數一般取 cpu 數量 +2 比較合適,線程數過多會導致額外的線程切換開銷。

Java 中的線程池是用 ThreadPoolExecutor 類來實現的. 本文就對該類的源碼來分析一下這個類內部對于線程的創建, 管理以及后臺任務的調度等方面的執行原理。

先看一下線程池的類圖:

上圖的目的主要是為了讓大家知道線程池相關類之間的關系,至少賺個眼熟,以后看到不會有害怕的感覺。

Executor 框架接口

Executor 框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。

下面是ThreadPoolExeCutor類圖。Executors其實是一個工具類,里面提供了好多靜態方法,這些方法根據用戶選擇返回不同的線程實例。

從上圖也可以看出來,ThreadPoolExeCutor 是線程池的核心。

J.U.C 中有三個 Executor 接口:

Executor:一個運行新任務的簡單接口;ExecutorService:擴展了 Executor 接口。添加了一些用來管理執行器生命周期和任務生命周期的方法;ScheduledExecutorService:擴展了 ExecutorService。支持 Future 和定期執行任務。其實通過這些接口就可以看到一些設計思想,每個接口的名字和其任務是完全匹配的。不會因為 Executor 中只有一個方法,就將其放到其他接口中。這也是很重要的單一原則。

ThreadPoolExeCutor 分析

在去具體分析 ThreadPoolExeCutor 運行邏輯前,先看下面的流程圖:

該圖是 ThreadPoolExeCutor 整個運行過程的一個概括,整個源碼的核心邏輯總結起來就是:

創建線程:要知道如何去創建線程,控制線程數量,線程的存活與銷毀;添加任務:任務添加后如何處理,是立刻執行,還是先保存;執行任務:如何獲取任務,任務執行失敗后如何處理?下面將進入源碼分析,來深入理解 ThreadPoolExeCutor 的設計思想。

構造函數

先來看構造函數:

corePoolSize 核心線程數:表示核心線程池的大小。當提交一個任務時,如果當前核心線程池的線程個數沒有達到 corePoolSize,則會創建新的線程來執行所提交的任務,即使當前核心線程池有空閑的線程。如果當前核心線程池的線程個數已經達到了corePoolSize,則不再重新創建線程。如果調用了 prestartCoreThread() 或者 prestartAllCoreThreads(),線程池創建的時候所有的核心線程都會被創建并且啟動。若 corePoolSize == 0,則任務執行完之后,沒有任何請求進入時,銷毀線程池的線程。若 corePoolSize > 0,即使本地任務執行完畢,核心線程也不會被銷毀。corePoolSize 其實可以理解為可保留的空閑線程數。maximumPoolSize: 表示線程池能夠容納同時執行的最大線程數。如果當阻塞隊列已滿時,并且當前線程池線程個數沒有超過 maximumPoolSize 的話,就會創建新的線程來執行任務。注意 maximumPoolSize >= 1 必須大于等于 1。maximumPoolSize == corePoolSize ,即是固定大小線程池。實際上最大容量是由 CAPACITY 控制。keepAliveTime: 線程空閑時間。當空閑時間達到 keepAliveTime值時,線程會被銷毀,直到只剩下 corePoolSize 個線程為止,避免浪費內存和句柄資源。默認情況,當線程池的線程數 > corePoolSize 時,keepAliveTime 才會起作用。但當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 變量設置為 true 時,核心線程超時后會被回收。unit:時間單位。為 keepAliveTime 指定時間單位。workQueue 緩存隊列。當請求的線程數 > maximumPoolSize時,線程進入 BlockingQueue 阻塞隊列。可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。threadFactory創建線程的工程類。可以通過指定線程工廠為每個創建出來的線程設置更有意義的名字,如果出現并發問題,也方便查找問題原因。handler 執行拒絕策略的對象。當線程池的阻塞隊列已滿和指定的線程都已經開啟,說明當前線程池已經處于飽和狀態了,那么就需要采用一種策略來處理這種情況。采用的策略有這幾種:AbortPolicy: 直接拒絕所提交的任務,并拋出 RejectedExecutionException異常;CallerRunsPolicy:只用調用者所在的線程來執行任務;DiscardPolicy:不處理直接丟棄掉任務;DiscardOldestPolicy:丟棄掉阻塞隊列中存放時間最久的任務,執行當前任務屬性定義

看完構造函數之后,再來看下該類里面的變量,有助于進一步理解整個代碼運行邏輯,下面是一些比較重要的變量:

這里需要對一些操作做些解釋。

Integer.SIZE:對于不同平臺,其位數不一樣,目前常見的是 32 位;(1 << COUNT_BITS) -1:首先是將 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其余都是 0;-1 操作則是將后面前的 COUNT_BITS 位都變成 1。-1 << COUNT_BITS:-1 的原碼是 10000000 00000000 00000000 00000001 ,反碼是 111111111 11111111 11111111 11111110 ,補碼 +1,然后左移 29 位是 11100000 00000000 00000000 00000000;這里轉為十進制是負數。~CAPACITY :取反,最高三位是1;總結:這里巧妙利用 bit 操作來將線程數量和運行狀態聯系在一起,減少了變量的存在和內存的占用。其中五種狀態的十進制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

線程池狀態

線程池狀態含義:

RUNNING:接受新任務并且處理阻塞隊列里的任務;SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務;STOP:拒絕新任務并且拋棄阻塞隊列里的任務同時會中斷正在處理的任務;TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為 0,將要調用 terminated 方法TERMINATED:終止狀態。terminated 方法調用完成以后的狀態;線程池狀態轉換:

RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它里面調用了shutdown()方法。RUNNING or SHUTDOWN)-> STOP:顯式 shutdownNow() 方法;SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候;STOP -> TIDYING:當線程池為空的時候;TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候;原碼,反碼,補碼知識小劇場

1. 原碼:原碼就是符號位加上真值的絕對值, 即用第一位表示符號,其余位表示值. 比如如果是 8 位二進制:

[+1]原 = 0000 0001

[-1]原 = 1000 0001

負數原碼第一位是符號位.

2. 反碼:反碼的表示方法是,正數的反碼是其本身,負數的反碼是在其原碼的基礎上, 符號位不變,其余各個位取反.

[+1] = [0000 0001]原 = [0000 0001]反

[-1] = [1000 0001]原 = [1111 1110]反

3. 補碼:補碼的表示方法是,正數的補碼就是其本身,負數的補碼是在其原碼的基礎上, 符號位不變, 其余各位取反, 最后 +1. (即在反碼的基礎上 +1)

[+1] = [0000 0001]原 = [0000 0001]反 = [0000 0001]補

[-1] = [1000 0001]原 = [1111 1110]反 = [1111 1111]補

4. 總結在知道一個數原碼的情況下:正數:反碼,補碼 就是本身自己負數:反碼是高位符號位不變,其余位取反。補碼:反碼+1

5. 左移:當數值左、右移時,先將數值轉化為其補碼形式,移完后,再轉換成對應的原碼

左移:高位丟棄,低位補零

[+1] = [00000001]補

[0000 0001]補 << 1 = [0000 0010]補 = [0000 0010]原 = [+2]

[-1] = [1000 0001]原 = [1111 1111]補

[1111 1111]補 << 1 = [1111 1110]補 = [1000 0010]原 = [-2]

其中,再次提醒,負數的補碼是反碼+1;負數的反碼是補碼-1;

6. 右移:高位保持不變,低位丟棄

[+127] = [0111 1111]原 = [0111 1111]補

[0111 1111]補 >> 1 = [0011 1111]補 = [0011 1111]原 = [+63]

[-127] = [1111 1111]原 = [1000 0001]補

[1000 0001]補 >> 1 = [1100 0000]補 = [1100 0000]原 = [-64]

execute 方法分析

通過 ThreadPoolExecutor 創建線程池后,提交任務后執行過程是怎樣的,下面來通過源碼來看一看。execute 方法源碼如下:

execute 方法執行邏輯有這樣幾種情況:

如果當前運行的線程少于 corePoolSize,則會創建新的線程來執行新的任務;如果運行的線程個數等于或者大于 corePoolSize,則會將提交的任務存放到阻塞隊列 workQueue 中;如果當前 workQueue 隊列已滿的話,則會創建新的線程來執行任務;如果線程個數已經超過了 maximumPoolSize,則會使用飽和策略 RejectedExecutionHandler 來進行處理。這里要注意一下 addWorker(null, false) 也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到 workQueue 中了,所以 worker 在執行的時候,會直接從 workQueue 中獲取任務。所以,在 workerCountOf(recheck) == 0 時執行 addWorker(null, false) 也是為了保證線程池在 RUNNING 狀態下必須要有一個線程來執行任務。

需要注意的是,線程池的設計思想就是使用了核心線程池 corePoolSize,阻塞隊列 workQueue 和線程池 maximumPoolSize,這樣的緩存策略來處理任務,實際上這樣的設計思想在需要框架中都會使用。

需要注意線程和任務之間的區別,任務是保存在 workQueue 中的,線程是從線程池里面取的,由 CAPACITY 控制容量。

addWorker 方法分析

addWorker 方法的主要工作是在線程池中創建一個新的線程并執行,firstTask 參數用于指定新增的線程執行的第一個任務,core 參數為 true 表示在新增線程時會判斷當前活動線程數是否少于 corePoolSize,false 表示新增線程前需要判斷當前活動線程數是否少于 maximumPoolSize,代碼如下:

這里需要注意有以下幾點:

在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。t.start()會調用 Worker 類中的 run 方法,Worker 本身實現了 Runnable 接口。原因在創建線程得時候,將 Worker 實例傳入了 t 當中,可參見 Worker 類的構造函數。wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次調用 addWorker 來添加線程會先判斷當前線程數是否超過了CAPACITY,然后再去判斷是否超 corePoolSize 或 maximumPoolSize,說明線程數實際上是由 CAPACITY 來控制的。內部類 Worker 分析

上面分析過程中,提到了一個 Worker 類,對于某些對源碼不是很熟悉得同學可能有點不清楚,下面就來看看 Worker 的源碼:

首先看到的是 Worker 繼承了(AbstractQueuedSynchronizer) AQS,并實現了 Runnable 接口,說明 Worker 本身也是線程。然后看其構造函數可以發現,內部有兩個屬性變量分別是 Runnable 和 Thread 實例,該類其實就是對傳進來得屬性做了一個封裝,并加入了獲取鎖的邏輯(繼承了 AQS )。具體可參考文章:透過 ReentrantLock 分析 AQS 的實現原理

Worker 繼承了 AQS,使用 AQS 來實現獨占鎖的功能。為什么不使用 ReentrantLock 來實現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:

lock 方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;如果正在執行任務,則不應該中斷線程;如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷;線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是否是空閑狀態;之所以設置為不可重入,是因為我們不希望任務在調用像 setCorePoolSize 這樣的線程池控制方法時重新獲取鎖。如果使用 ReentrantLock,它是可重入的,這樣如果在任務中調用了如 setCorePoolSize 這類線程池控制的方法,會中斷正在運行的線程,因為 size 小了,需要中斷一些線程 。所以,Worker 繼承自 AQS,用于判斷線程是否空閑以及是否可以被中斷。

此外,在構造方法中執行了 setState(-1);,把 state 變量設置為 -1,為什么這么做呢?是因為 AQS 中默認的 state 是 0,如果剛創建了一個 Worker 對象,還沒有執行任務時,這時就不應該被中斷,看一下 tryAquire 方法:

正因為如此,在 runWorker 方法中會先調用 Worker 對象的 unlock 方法將 state 設置為 0。tryAcquire 方法是根據 state 是否是 0 來判斷的,所以,setState(-1);

將 state 設置為 -1 是為了禁止在執行任務前對線程進行中斷。

runWorker 方法分析

前面提到了內部類 Worker 的 run 方法調用了外部類 runWorker,下面來看下 runWork 的具體邏輯。

總結一下 runWorker 方法的執行過程:

while 循環不斷地通過 getTask() 方法從阻塞隊列中取任務;如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態;調用 task.run()執行任務;如果 task 為 null 則跳出循環,執行 processWorkerExit 方法;runWorker 方法執行完畢,也代表著 Worker 中的 run 方法執行完畢,銷毀線程。這里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 類中是空的,留給子類來實現。

completedAbruptly 變量來表示在執行任務過程中是否出現了異常,在 processWorkerExit 方法中會對該變量的值進行判斷。

getTask 方法分析

getTask 方法是從阻塞隊列里面獲取任務,具體代碼邏輯如下:

其實到這里后,你會發現在 ThreadPoolExcute 內部有幾個重要的檢驗:

判斷當前的運行狀態,根據運行狀態來做處理,如果當前都停止運行了,那很多操作也就沒必要了;判斷當前線程池的數量,然后將該數據和 corePoolSize 以及 maximumPoolSize 進行比較,然后再去決定下一步該做啥;首先是第一個 if 判斷,當運行狀態處于非 RUNNING 狀態,此外 rs >= STOP(線程池是否正在 stop)或阻塞隊列是否為空。則將 workerCount 減 1 并返回 null。為什么要減 1 呢,因為此處其實是去獲取一個 task,但是發現處于停止狀態了,也就是沒必要再去獲取運行任務了,那這個線程就沒有存在的意義了。后續也會在 processWorkerExit 將該線程移除。

第二個 if 條件目的是控制線程池的有效線程數量。由上文中的分析可以知道,在執行 execute 方法時,如果當前線程池的線程數量超過了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前線程池中不需要那么多線程來執行任務了,可以把多于 corePoolSize 數量的線程銷毀掉,保持線程數量在 corePoolSize 即可。

什么時候會銷毀?當然是 runWorker 方法執行完之后,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收。

getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然后會執行 processWorkerExit 方法。

processWorkerExit 方法

下面在看 processWorkerExit 方法的具體邏輯:

至此,processWorkerExit 執行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期。但是這有兩點需要注意:

大家想想什么時候才會調用這個方法,任務干完了才會調用。那么沒事做了,就需要看下是否有必要結束線程池,這時候就會調用 tryTerminate。如果此時線程處于 STOP 狀態以下,那么就會判斷核心線程數是否達到了規定的數量,沒有的話,就會繼續創建一個線程。tryTerminate方法

tryTerminate 方法根據線程池狀態進行判斷是否結束線程池,代碼如下:

interruptIdleWorkers(boolean onlyOne) 如果 ONLY_ONE = true 那么就的最多讓一個空閑線程發生中斷,ONLY_ONE = false 時是所有空閑線程都會發生中斷。那線程什么時候會處于空閑狀態呢?

一是線程數量很多,任務都完成了;二是線程在 getTask 方法中執行 workQueue.take() 時,如果不執行中斷會一直阻塞。

所以每次在工作線程結束時調用 tryTerminate 方法來嘗試中斷一個空閑工作線程,避免在隊列為空時取任務一直阻塞的情況。

shutdown方法

shutdown 方法要將線程池切換到 SHUTDOWN 狀態,并調用 interruptIdleWorkers 方法請求中斷所有空閑的 worker,最后調用 tryTerminate 嘗試結束線程池。

這里思考一個問題:在 runWorker 方法中,執行任務時對 Worker 對象 w 進行了 lock 操作,為什么要在執行任務的時候對每個工作線程都加鎖呢?

下面仔細分析一下:

在 getTask 方法中,如果這時線程池的狀態是 SHUTDOWN 并且 workQueue 為空,那么就應該返回 null 來結束這個工作線程,而使線程池進入 SHUTDOWN 狀態需要調用shutdown 方法;shutdown 方法會調用 interruptIdleWorkers 來中斷空閑的線程,interruptIdleWorkers 持有 mainLock,會遍歷 workers 來逐個判斷工作線程是否空閑。但 getTask 方法中沒有mainLock;在 getTask 中,如果判斷當前線程池狀態是 RUNNING,并且阻塞隊列為空,那么會調用 workQueue.take() 進行阻塞;如果在判斷當前線程池狀態是 RUNNING 后,這時調用了 shutdown 方法把狀態改為了 SHUTDOWN,這時如果不進行中斷,那么當前的工作線程在調用了 workQueue.take() 后會一直阻塞而不會被銷毀,因為在 SHUTDOWN 狀態下不允許再有新的任務添加到 workQueue 中,這樣一來線程池永遠都關閉不了了;由上可知,shutdown 方法與 getTask 方法(從隊列中獲取任務時)存在競態條件;解決這一問題就需要用到線程的中斷,也就是為什么要用 interruptIdleWorkers 方法。在調用 workQueue.take() 時,如果發現當前線程在執行之前或者執行期間是中斷狀態,則會拋出 InterruptedException,解除阻塞的狀態;但是要中斷工作線程,還要判斷工作線程是否是空閑的,如果工作線程正在處理任務,就不應該發生中斷;所以 Worker 繼承自 AQS,在工作線程處理任務時會進行 lock,interruptIdleWorkers 在進行中斷時會使用 tryLock 來判斷該工作線程是否正在處理任務,如果 tryLock 返回 true,說明該工作線程當前未執行任務,這時才可以被中斷。下面就來分析一下 interruptIdleWorkers 方法。

interruptIdleWorkers方法

interruptIdleWorkers 遍歷 workers 中所有的工作線程,若線程沒有被中斷 tryLock 成功,就中斷該線程。

為什么需要持有 mainLock ?因為 workers 是 HashSet 類型的,不能保證線程安全。

shutdownNow方法

shutdownNow 方法與 shutdown 方法類似,不同的地方在于:

設置狀態為 STOP;中斷所有工作線程,無論是否是空閑的;取出阻塞隊列中沒有被執行的任務并返回。shutdownNow 方法執行完之后調用 tryTerminate 方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置為 TERMINATED。

線程池的監控

通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

getTaskCount:線程池已經執行的和未執行的任務總數;getCompletedTaskCount:線程池已完成的任務數量,該值小于等于 taskCount;getLargestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize;getPoolSize:線程池當前的線程數量;getActiveCount:當前線程池中正在執行任務的線程數量。通過這些方法,可以對線程池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計線程池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展。

到此,關于 ThreadPoolExecutor 的內容就講完了。

總結

以上是生活随笔為你收集整理的java excutorthread_JAVA 线程池ThreadPoolExcutor原理探究的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。