日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

concurrent: ThreadPoolExecutor 用法

發(fā)布時間:2024/4/14 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 concurrent: ThreadPoolExecutor 用法 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?? thread pool一般被用來解決兩個問題:當(dāng)處理大量的同步task的時候,它能夠避免thread不斷創(chuàng)建銷毀的開銷;而另外一個也許更重要的含義是,它其實表示了一個boundary,通過使用thread pool可以限制這些任務(wù)所消耗的資源,比如最大線程數(shù),比如最大的消息緩沖池。
????? 需要指出的是,ThreadPoolExecutor不僅僅是簡單的多個thread的集合,它還帶有一個消息隊列。
在Java中,如果只是需要一個簡單的thread pool,ExecuteService可能更為合適,這是一個Interface。可以通過調(diào)用Executor的靜態(tài)方法來獲得一些簡單的threadpool,如:
????? ExecuteService pool = Executors.newFixedThreadPool(poolSize);?
????? 但如果要用定制的thread pool,則要使用ThreadPoolExecutor類

?

ThreadPoolExecutor的工作機(jī)制:?

?

?

構(gòu)造方法為:

?
?????? ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
????????????? long keepAliveTime, TimeUnit unit,
????????????? BlockingQueue<Runnable> workQueue,
????????????? RejectedExecutionHandler handler)


參數(shù)說明:

?corePoolSize : 線程池維護(hù)線程的最少數(shù)量
?maximumPoolSize :線程池維護(hù)線程的最大數(shù)量
?keepAliveTime : 線程池維護(hù)線程所允許的空閑時間
?unit : 線程池維護(hù)線程所允許的空閑時間的單位
?workQueue : 線程池所使用的緩沖隊列
?handler : 線程池對拒絕任務(wù)的處理策略


方法說明

?

?execute(Runnable):一個任務(wù)通過 execute(Runnable) 方法被添加到線程池,任務(wù)就是一個 Runnable 類型的對象,任務(wù)的執(zhí)行方法就是 Runnable 類型對象的run() 方法。當(dāng)一個任務(wù)通過execute(Runnable) 方法欲添加到線程池時,會按照下面的步驟執(zhí)行:

? 如果此時線程池中的數(shù)量小于corePoolSize ,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的任務(wù)。
? 如果此時線程池中的數(shù)量等于corePoolSize ,但是緩沖隊列 workQueue 未滿,那么任務(wù)被放入緩沖隊列。
? 如果此時線程池中的數(shù)量大于corePoolSize ,緩沖隊列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize ,建新的線程來處理被添加的任務(wù)。
? 如果此時線程池中的數(shù)量大于corePoolSize ,緩沖隊列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize ,那么通過 handler 所指定的策略來處理此任務(wù)。
? 也就是:處理任務(wù)的優(yōu)先級為:核心線程corePoolSize 、任務(wù)隊列workQueue 、最大線程maximumPoolSize ,如果三者都滿了,使用handler 處理被拒絕的任務(wù)。
? 當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時,如果某線程空閑時間超過keepAliveTime ,線程將被終止。這樣,線程池可以動態(tài)的調(diào)整池中的線程數(shù)。

?

?unit 可選的參數(shù)為java.util.concurrent.TimeUnit 中的幾個靜態(tài)屬性: NANOSECONDS 、 MICROSECONDS 、 MILLISECONDS 、 SECONDS 。

?

?workQueue 常用的是:java.util.concurrent.ArrayBlockingQueue

?

?handler 有四個選擇:

??ThreadPoolExecutor.AbortPolicy():直接拋異常,(java.util.concurrent.RejectedExecutionException )
??ThreadPoolExecutor.CallerRunsPolicy() :重試添加當(dāng)前的任務(wù),他會自動重復(fù)調(diào)用execute() 方法?
??ThreadPoolExecutor.DiscardOldestPolicy():拋棄舊的任務(wù)
??ThreadPoolExecutor.DiscardPolicy() :拋棄當(dāng)前的任務(wù)

?

網(wǎng)站摘取使用提示:轉(zhuǎn)載自:http://www.iteye.com/topic/1118660

  • pool threads啟動后,以后的任務(wù)獲取都會通過block queue中,獲取堆積的runnable task.
    ?所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
  • corePoolSize 和 maximumPoolSize的區(qū)別, 和大家正常理解的數(shù)據(jù)庫連接池不太一樣。
    ?? *? 據(jù)dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐內(nèi)存中的threads數(shù)量,maxActive代表是工作的最大線程數(shù)。
    ?? *? 這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設(shè)置keepAliveTime,超過多少時間多有任務(wù)后銷毀線程,但不會固定保持一定數(shù)量的threads)。
    ?? * 這里的maximumPoolSize,是一種救急措施的第一層。當(dāng)threadPoolExecutor的工作threads存在滿負(fù)荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完后立馬退出。
    ?所以建議:? maximumPoolSize >= corePoolSize =期望的最大線程數(shù)。 (我曾經(jīng)配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最后就成了單線程工作的pool。典型的配置錯誤)
  • 善用blockqueue和reject組合. 這里要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調(diào)用者自己來運行。
    ?我們經(jīng)常會在線上使用一些線程池做異步處理,比如我前面做的(業(yè)務(wù)層)異步并行加載技術(shù)分析和設(shè)計, 將原本串行的請求都變?yōu)榱瞬⑿胁僮?#xff0c;但過多的并行會增加系統(tǒng)的負(fù)載(比如軟中斷,上下文切換)。所以肯定需要對線程池做一個size限制。但是為了引入異步操作后,避免因在block queue的等待時間過長,所以需要在隊列滿的時,執(zhí)行一個callsRun的策略,并行的操作又轉(zhuǎn)為一個串行處理,這樣就可以保證盡量少的延遲影響。
    ?所以建議:? RejectExecutionHandler = CallsRun ,? blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務(wù))
  • 一個簡單的例子

    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class TestThreadPool {private static int produceTaskSleepTime = 2;private static int produceTaskMaxNumber = 10;public static void main(String[] args) {// 構(gòu)造一個線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 1; i <= produceTaskMaxNumber; i++) {try {String task = "task@ " + i;System.out.println("創(chuàng)建任務(wù)并提交到線程池中:" + task);threadPool.execute(new ThreadPoolTask(task));Thread.sleep(produceTaskSleepTime);} catch (Exception e) {e.printStackTrace();}}} }

    ?

    創(chuàng)建task類:

    ?

    import java.io.Serializable;public class ThreadPoolTask implements Runnable, Serializable {private Object attachData;ThreadPoolTask(Object tasks) {this.attachData = tasks;}public void run() {System.out.println("開始執(zhí)行任務(wù):" + attachData);attachData = null;}public Object getTask() {return this.attachData;} }

    ?

    執(zhí)行結(jié)果:

    ????????????? ?創(chuàng)建任務(wù)并提交到線程池中:task@ 1

    開始執(zhí)行任務(wù):task@ 1

    創(chuàng)建任務(wù)并提交到線程池中:task@ 2

    開始執(zhí)行任務(wù):task@ 2

    創(chuàng)建任務(wù)并提交到線程池中:task@ 3

    創(chuàng)建任務(wù)并提交到線程池中:task@ 4

    開始執(zhí)行任務(wù):task@ 3

    創(chuàng)建任務(wù)并提交到線程池中:task@ 5

    開始執(zhí)行任務(wù):task@ 4

    創(chuàng)建任務(wù)并提交到線程池中:task@ 6

    創(chuàng)建任務(wù)并提交到線程池中:task@ 7

    創(chuàng)建任務(wù)并提交到線程池中:task@ 8

    開始執(zhí)行任務(wù):task@ 5

    開始執(zhí)行任務(wù):task@ 6

    創(chuàng)建任務(wù)并提交到線程池中:task@ 9

    開始執(zhí)行任務(wù):task@ 7

    創(chuàng)建任務(wù)并提交到線程池中:task@ 10

    開始執(zhí)行任務(wù):task@ 8

    開始執(zhí)行任務(wù):task@ 9

    開始執(zhí)行任務(wù):task@ 10

    ?

    總結(jié)

    以上是生活随笔為你收集整理的concurrent: ThreadPoolExecutor 用法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。