日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java多线程(十一)之线程池深入分析(上)

發布時間:2024/1/17 java 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java多线程(十一)之线程池深入分析(上) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線程池是并發包里面很重要的一部分,在實際情況中也是使用很多的一個重要組件。

下圖描述的是線程池API的一部分。廣義上的完整線程池可能還包括Thread/Runnable、Timer/TimerTask等部分。這里只介紹主要的和高級的API以及架構和原理。

大多數并發應用程序是圍繞執行任務(Task)進行管理的。所謂任務就是抽象、離散的工作單元(unit of work)。把一個應用程序的工作(work)分離到任務中,可以簡化程序的管理;這種分離還在不同事物間劃分了自然的分界線,可以方便程序在出現錯誤時進行恢復;同時這種分離還可以為并行工作提供一個自然的結構,有利于提高程序的并發性。下面通過任務的執行策略來引入Executor相關的介紹。

?

一、任務的執行策略

?

任務的執行策略包括4W3H部分:

  • 任務在什么(What)線程中執行
  • 任務以什么(What)順序執行(FIFO/LIFO/優先級等)
  • 同時有多少個(How Many)任務并發執行
  • 允許有多少個(How Many)個任務進入執行隊列
  • 系統過載時選擇放棄哪一個(Which)任務,如何(How)通知應用程序這個動作
  • 任務執行的開始、結束應該做什么(What)處理

在后面的章節中會詳細分寫這些策略是如何實現的。我們先來簡單回答些如何滿足上面的條件。

  • 首先明確一定是在Java里面可以供使用者調用的啟動線程類是Thread。因此Runnable或者Timer/TimerTask等都是要依賴Thread來啟動的,因此在ThreadPool里面同樣也是靠Thread來啟動多線程的。
  • 默認情況下Runnable接口執行完畢后是不能拿到執行結果的,因此在ThreadPool里就定義了一個Callable接口來處理執行結果。
  • 為了異步阻塞的獲取結果,Future可以幫助調用線程獲取執行結果。
  • Executor解決了向線程池提交任務的入口問題,同時ScheduledExecutorService解決了如何進行重復調用任務的問題。
  • CompletionService解決了如何按照執行完畢的順序獲取結果的問題,這在某些情況下可以提高任務執行的并發,調用線程不必在長時間任務上等待過多時間。
  • 顯然線程的數量是有限的,而且也不宜過多,因此合適的任務隊列是必不可少的,BlockingQueue的容量正好可以解決此問題。
  • 固定任務容量就意味著在容量滿了以后需要一定的策略來處理過多的任務(新任務),RejectedExecutionHandler正好解決此問題。
  • 一定時間內阻塞就意味著有超時,因此TimeoutException就是為了描述這種現象。TimeUnit是為了描述超時時間方便的一個時間單元枚舉類。
  • 有上述問題就意味了配置一個合適的線程池是很復雜的,因此Executors默認的一些線程池配置可以減少這個操作。
  • ?

    二、線程池Executor的類體系結構與常用線程池

    ?

    ?

    Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService

    下面這張圖完整描述了線程池的類體系結構。

    ?

    首先Executor的execute方法只是執行一個Runnable的任務,當然了從某種角度上將最后的實現類也是在線程中啟動此任務的。根據線程池的執行策略最后這個任務可能在新的線程中執行,或者線程池中的某個線程,甚至是調用者線程中執行(相當于直接運行Runnable的run方法)。這點在后面會詳細說明。

    ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:

    • Future<?> submit(Runnable task)
    • <T> Future<T> submit(Callable<T> task)

    這兩個方法都是向線程池中提交任務,它們的區別在于Runnable在執行完畢后沒有結果,Callable執行完畢后有一個結果。這在多個線程中傳遞狀態和結果是非常有用的。另外他們的相同點在于都返回一個Future對象。Future對象可以阻塞線程直到運行完畢(獲取結果,如果有的話),也可以取消任務執行,當然也能夠檢測任務是否被取消或者是否執行完畢。

    ?

    ?

    要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在

    Executors類里面提供了一些靜態工廠,生成一些常用的線程池

    • newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
    • newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
    • newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。
    • newScheduledThreadPool:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
    • newSingleThreadScheduledExecutor:創建一個單線程的線程池。此線程池支持定時以及周期性執行任務的需求。

    ?

    三、線程池Executor的數據結構

    ?

    ?

    由于已經看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數據結構。下圖3描述了這種數據結構。

    圖3 ThreadPoolExecutor 數據結構

    其實,即使沒有上述圖形描述ThreadPoolExecutor的數據結構,我們根據線程池的要求也很能夠猜測出其數據結構出來。

    • 線程池需要支持多個線程并發執行,因此有一個線程集合Collection<Thread>來執行線程任務;
    • 涉及任務的異步執行,因此需要有一個集合來緩存任務隊列Collection<Runnable>;
    • 很顯然在多個線程之間協調多個任務,那么就需要一個線程安全的任務集合,同時還需要支持阻塞、超時操作,那么BlockingQueue是必不可少的;
    • 既然是線程池,出發點就是提高系統性能同時降低資源消耗,那么線程池的大小就有限制,因此需要有一個核心線程池大小(線程個數)和一個最大線程池大小(線程個數),有一個計數用來描述當前線程池大小;
    • 如果是有限的線程池大小,那么長時間不使用的線程資源就應該銷毀掉,這樣就需要一個線程空閑時間的計數來描述線程何時被銷毀;
    • 前面描述過線程池也是有生命周期的,因此需要有一個狀態來描述線程池當前的運行狀態;
    • 線程池的任務隊列如果有邊界,那么就需要有一個任務拒絕策略來處理過多的任務,同時在線程池的銷毀階段也需要有一個任務拒絕策略來處理新加入的任務;
    • 上面種的線程池大小、線程空閑實際那、線程池運行狀態等等狀態改變都不是線程安全的,因此需要有一個全局的鎖(mainLock)來協調這些競爭資源;
    • 除了以上數據結構以外,ThreadPoolExecutor還有一些狀態用來描述線程池的運行計數,例如線程池運行的任務數、曾經達到的最大線程數,主要用于調試和性能分析。

    ?

    四、線程池Executor生命周期

    ?

    ?

    線程池Executor是異步的執行任務,因此任何時刻不能夠直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。因此關閉線程池可能出現一下幾種情況:

    • 平緩關閉:已經啟動的任務全部執行完畢,同時不再接受新的任務
    • 立即關閉:取消所有正在執行和未執行的任務

    另外關閉線程池后對于任務的狀態應該有相應的反饋信息。

    ?

    圖4 描述了線程池的4種狀態。

    • 線程池在構造前(new操作)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING。嚴格意義上講線程池構造完成后并沒有線程被立即啟動,只有進行“預啟動”或者接收到任務的時候才會啟動線程。這個會后面線程池的原理會詳細分析。但是線程池是出于運行狀態,隨時準備接受任務來執行。
    • 線程池運行中可以通過shutdown()和shutdownNow()來改變運行狀態。shutdown()是一個平緩的關閉過程,線程池停止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列還沒有開始的任務,這時候線程池處于SHUTDOWN狀態;shutdownNow()是一個立即關閉過程,線程池停止接受新的任務,同時線程池取消所有執行的任務和已經進入隊列但是還沒有執行的任務,這時候線程池處于STOP狀態。
    • 一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED狀態,此時線程池就結束了。
    • isTerminating()描述的是SHUTDOWN和STOP兩種狀態。
    • isShutdown()描述的是非RUNNING狀態,也就是SHUTDOWN/STOP/TERMINATED三種狀態。

    ?

    圖4

    線程池的API如下:

    圖5

    其中shutdownNow()會返回那些已經進入了隊列但是還沒有執行的任務列表。awaitTermination描述的是等待線程池關閉的時間,如果等待時間線程池還沒有關閉將會拋出一個超時異常。

    對于關閉線程池期間發生的任務提交情況就會觸發一個拒絕執行的操作。這是java.util.concurrent.RejectedExecutionHandler描述的任務操作。下一個小結中將描述這些任務被拒絕后的操作。

    ?

    總結下這個小節

  • 線程池有運行、關閉、停止、結束四種狀態,結束后就會釋放所有資源

  • 平緩關閉線程池使用shutdown()

  • 立即關閉線程池使用shutdownNow(),同時得到未執行的任務列表

  • 檢測線程池是否正處于關閉中,使用isShutdown()

  • 檢測線程池是否已經關閉使用isTerminated()

  • 定時或者永久等待線程池關閉結束使用awaitTermination()操作

  • ?

    五、線程池Executor任務拒絕策略

    ?

    緊接上面,對于關閉線程池期間發生的任務提交情況就會觸發一個拒絕執行的操作。這是java.util.concurrent.RejectedExecutionHandler描述的任務操作。

    先來分析下為什么有任務拒絕的情況發生。

    這里先假設一個前提:線程池有一個任務隊列,用于緩存所有待處理的任務,正在處理的任務將從任務隊列中移除。因此在任務隊列長度有限的情況下就會出現新任務的拒絕處理問題,需要有一種策略來處理應該加入任務隊列卻因為隊列已滿無法加入的情況。另外在線程池關閉的時候也需要對任務加入隊列操作進行額外的協調處理。

    ?

    RejectedExecutionHandler提供了四種方式來處理任務拒絕策略。

    這四種策略是獨立無關的,是對任務拒絕處理的四種表現形式。

    最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入隊列的任務本身(DiscardPolicy)或者丟棄任務隊列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接拋出一個異常(RejectedExecutionException),這是比較簡單的方式。拋出異常的方式(AbortPolicy)盡管實現方式比較簡單,但是由于拋出一個RuntimeException,因此會中斷調用者的處理過程。除了拋出異常以外還可以不進入線程池執行,在這種方式(CallerRunsPolicy)中任務將有調用者線程去執行。

    ?

    上面是一些理論知識,下面結合一些例子進行分析討論。

    ?

  • package xylz.study.concurrency;

  • ?
  • import java.lang.reflect.Field;

  • import java.util.concurrent.ArrayBlockingQueue;

  • import java.util.concurrent.ThreadPoolExecutor;

  • import java.util.concurrent.TimeUnit;

  • import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;

  • import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;

  • ?
  • public class ExecutorServiceDemo {

  • ?
  • static void log(String msg) {

  • System.out.println(System.currentTimeMillis() + " -> " + msg);

  • }

  • ?
  • static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {

  • Field f = ThreadPoolExecutor.class.getDeclaredField("runState");

  • f.setAccessible(true);

  • int v = f.getInt(pool);

  • return v;

  • }

  • ?
  • public static void main(String[] args) throws Exception {

  • ?
  • ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,

  • new ArrayBlockingQueue<Runnable>(1));

  • pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

  • for (int i = 0; i < 10; i++) {

  • final int index = i;

  • pool.submit(new Runnable() {

  • ?
  • public void run() {

  • log("run task:" + index + " -> " + Thread.currentThread().getName());

  • try {

  • Thread.sleep(1000L);

  • } catch (Exception e) {

  • e.printStackTrace();

  • }

  • log("run over:" + index + " -> " + Thread.currentThread().getName());

  • }

  • });

  • }

  • log("before sleep");

  • Thread.sleep(4000L);

  • log("before shutdown()");

  • pool.shutdown();

  • log("after shutdown(),pool.isTerminated=" + pool.isTerminated());

  • pool.awaitTermination(1000L, TimeUnit.SECONDS);

  • log("now,pool.isTerminated=" + pool.isTerminated() + ", state="

  • + getThreadPoolRunState(pool));

  • }

  • ?
  • }

  • ?

    ?

    ?

    第一種方式直接丟棄(DiscardPolicy)的輸出結果是:

    ?

  • 1294494050696 -> run task:0

  • 1294494050696 -> before sleep

  • 1294494051697 -> run over:0 -> pool-1-thread-1

  • 1294494051697 -> run task:1

  • 1294494052697 -> run over:1 -> pool-1-thread-1

  • 1294494054697 -> before shutdown()

  • 1294494054697 -> after shutdown(),pool.isTerminated=false

  • 1294494054698 -> now,pool.isTerminated=true, state=3

  • ?

    ?

    對于上面的結果需要補充幾點。

  • 線程池設定線程大小為1,因此輸出的線程就只有一個”pool-1-thread-1”,至于為什么是這個名稱,以后會分析。
  • 任務隊列的大小為1,因此可以輸出一個任務執行結果。但是由于線程本身可以帶有一個任務,因此實際上一共執行了兩個任務(task0和task1)。
  • shutdown()一個線程并不能理解是線程運行狀態位terminated,可能需要稍微等待一點時間。盡管這里等待時間參數是1000秒,但是實際上從輸出時間來看僅僅等了約1ms。
  • 直接丟棄任務是丟棄將要進入線程池本身的任務,所以當運行task0是,task1進入任務隊列,task2~task9都被直接丟棄了,沒有運行。
  • 如果把策略換成丟棄最舊任務(DiscardOldestPolicy),結果會稍有不同。

    ?

  • 1294494484622 -> run task:0

  • 1294494484622 -> before sleep

  • 1294494485622 -> run over:0 -> pool-1-thread-1

  • 1294494485622 -> run task:9

  • 1294494486622 -> run over:9 -> pool-1-thread-1

  • 1294494488622 -> before shutdown()

  • 1294494488622 -> after shutdown(),pool.isTerminated=false

  • 1294494488623 -> now,pool.isTerminated=true, state=3

  • ?

    ?

    這里依然只是執行兩個任務,但是換成了任務task0和task9。實際上task1~task8還是進入了任務隊列,只不過被task9擠出去了。

    對于異常策略(AbortPolicy)就比較簡單,這回調用線程的任務執行。

    對于調用線程執行方式(CallerRunsPolicy),輸出的結果就有意思了。

    ?

  • 1294496076266 -> run task:2 -> main

  • 1294496076266 -> run task:0 -> pool-1-thread-1

  • 1294496077266 -> run over:0 -> pool-1-thread-1

  • 1294496077266 -> run task:1 -> pool-1-thread-1

  • 1294496077266 -> run over:2 -> main

  • 1294496077266 -> run task:4 -> main

  • 1294496078267 -> run over:4 -> main

  • 1294496078267 -> run task:5 -> main

  • 1294496078267 -> run over:1 -> pool-1-thread-1

  • 1294496078267 -> run task:3 -> pool-1-thread-1

  • 1294496079267 -> run over:3 -> pool-1-thread-1

  • 1294496079267 -> run over:5 -> main

  • 1294496079267 -> run task:7 -> main

  • 1294496079267 -> run task:6 -> pool-1-thread-1

  • 1294496080267 -> run over:7 -> main

  • 1294496080267 -> run task:9 -> main

  • 1294496080267 -> run over:6 -> pool-1-thread-1

  • 1294496080267 -> run task:8 -> pool-1-thread-1

  • 1294496081268 -> run over:9 -> main

  • 1294496081268 -> before sleep

  • 1294496081268 -> run over:8 -> pool-1-thread-1

  • 1294496085268 -> before shutdown()

  • 1294496085268 -> after shutdown(),pool.isTerminated=false

  • 1294496085269 -> now,pool.isTerminated=true, state=3

  • ?

    ?

    參考內容:

    深入淺出 Java Concurrency (28): 線程池 part 1 簡介
    http://www.blogjava.net/xylz/archive/2010/12/19/341098.html
    深入淺出 Java Concurrency (29): 線程池 part 2 Executor 以及Executors
    http://www.blogjava.net/xylz/archive/2010/12/21/341281.html
    深入淺出 Java Concurrency (30): 線程池 part 3 Executor 生命周期
    http://www.blogjava.net/xylz/archive/2011/01/04/342316.html
    深入淺出 Java Concurrency (31): 線程池 part 4 線程池任務拒絕策略
    http://www.blogjava.net/xylz/archive/2011/01/08/342609.html
    java的concurrent用法詳解
    http://blog.csdn.net/a511596982/article/details/8063742

    總結

    以上是生活随笔為你收集整理的Java多线程(十一)之线程池深入分析(上)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。