concurrent: ThreadPoolExecutor 用法
?? thread pool一般被用來解決兩個問題:當(dāng)處理大量的同步task的時候,它能夠避免thread不斷創(chuàng)建銷毀的開銷;而另外一個也許更重要的含義是,它其實表示了一個boundary,通過使用thread pool可以限制這些任務(wù)所消耗的資源,比如最大線程數(shù),比如最大的消息緩沖池。
????? 需要指出的是,ThreadPoolExecutor不僅僅是簡單的多個thread的集合,它還帶有一個消息隊列。
在Java中,如果只是需要一個簡單的thread pool,ExecuteService可能更為合適,這是一個Interface。可以通過調(diào)用Executor的靜態(tài)方法來獲得一些簡單的threadpool,如:
????? ExecuteService pool = Executors.newFixedThreadPool(poolSize);?
????? 但如果要用定制的thread pool,則要使用ThreadPoolExecutor類
?
ThreadPoolExecutor的工作機(jī)制:?
?
?
構(gòu)造方法為:
?
?????? ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
????????????? long keepAliveTime, TimeUnit unit,
????????????? BlockingQueue<Runnable> workQueue,
????????????? RejectedExecutionHandler handler)
參數(shù)說明:
?corePoolSize : 線程池維護(hù)線程的最少數(shù)量
?maximumPoolSize :線程池維護(hù)線程的最大數(shù)量
?keepAliveTime : 線程池維護(hù)線程所允許的空閑時間
?unit : 線程池維護(hù)線程所允許的空閑時間的單位
?workQueue : 線程池所使用的緩沖隊列
?handler : 線程池對拒絕任務(wù)的處理策略
方法說明
?
?execute(Runnable):一個任務(wù)通過 execute(Runnable) 方法被添加到線程池,任務(wù)就是一個 Runnable 類型的對象,任務(wù)的執(zhí)行方法就是 Runnable 類型對象的run() 方法。當(dāng)一個任務(wù)通過execute(Runnable) 方法欲添加到線程池時,會按照下面的步驟執(zhí)行:
? 如果此時線程池中的數(shù)量小于corePoolSize ,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的任務(wù)。
? 如果此時線程池中的數(shù)量等于corePoolSize ,但是緩沖隊列 workQueue 未滿,那么任務(wù)被放入緩沖隊列。
? 如果此時線程池中的數(shù)量大于corePoolSize ,緩沖隊列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize ,建新的線程來處理被添加的任務(wù)。
? 如果此時線程池中的數(shù)量大于corePoolSize ,緩沖隊列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize ,那么通過 handler 所指定的策略來處理此任務(wù)。
? 也就是:處理任務(wù)的優(yōu)先級為:核心線程corePoolSize 、任務(wù)隊列workQueue 、最大線程maximumPoolSize ,如果三者都滿了,使用handler 處理被拒絕的任務(wù)。
? 當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時,如果某線程空閑時間超過keepAliveTime ,線程將被終止。這樣,線程池可以動態(tài)的調(diào)整池中的線程數(shù)。
?
?unit 可選的參數(shù)為java.util.concurrent.TimeUnit 中的幾個靜態(tài)屬性: NANOSECONDS 、 MICROSECONDS 、 MILLISECONDS 、 SECONDS 。
?
?workQueue 常用的是:java.util.concurrent.ArrayBlockingQueue
?
?handler 有四個選擇:
??ThreadPoolExecutor.AbortPolicy():直接拋異常,(java.util.concurrent.RejectedExecutionException )
??ThreadPoolExecutor.CallerRunsPolicy() :重試添加當(dāng)前的任務(wù),他會自動重復(fù)調(diào)用execute() 方法?
??ThreadPoolExecutor.DiscardOldestPolicy():拋棄舊的任務(wù)
??ThreadPoolExecutor.DiscardPolicy() :拋棄當(dāng)前的任務(wù)
?
網(wǎng)站摘取使用提示:轉(zhuǎn)載自:http://www.iteye.com/topic/1118660
?所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
?? *? 據(jù)dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐內(nèi)存中的threads數(shù)量,maxActive代表是工作的最大線程數(shù)。
?? *? 這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設(shè)置keepAliveTime,超過多少時間多有任務(wù)后銷毀線程,但不會固定保持一定數(shù)量的threads)。
?? * 這里的maximumPoolSize,是一種救急措施的第一層。當(dāng)threadPoolExecutor的工作threads存在滿負(fù)荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完后立馬退出。
?所以建議:? maximumPoolSize >= corePoolSize =期望的最大線程數(shù)。 (我曾經(jīng)配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最后就成了單線程工作的pool。典型的配置錯誤)
?我們經(jīng)常會在線上使用一些線程池做異步處理,比如我前面做的(業(yè)務(wù)層)異步并行加載技術(shù)分析和設(shè)計, 將原本串行的請求都變?yōu)榱瞬⑿胁僮?#xff0c;但過多的并行會增加系統(tǒng)的負(fù)載(比如軟中斷,上下文切換)。所以肯定需要對線程池做一個size限制。但是為了引入異步操作后,避免因在block queue的等待時間過長,所以需要在隊列滿的時,執(zhí)行一個callsRun的策略,并行的操作又轉(zhuǎn)為一個串行處理,這樣就可以保證盡量少的延遲影響。
?所以建議:? RejectExecutionHandler = CallsRun ,? blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務(wù))
一個簡單的例子
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class TestThreadPool {private static int produceTaskSleepTime = 2;private static int produceTaskMaxNumber = 10;public static void main(String[] args) {// 構(gòu)造一個線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 1; i <= produceTaskMaxNumber; i++) {try {String task = "task@ " + i;System.out.println("創(chuàng)建任務(wù)并提交到線程池中:" + task);threadPool.execute(new ThreadPoolTask(task));Thread.sleep(produceTaskSleepTime);} catch (Exception e) {e.printStackTrace();}}} }?
創(chuàng)建task類:
?
import java.io.Serializable;public class ThreadPoolTask implements Runnable, Serializable {private Object attachData;ThreadPoolTask(Object tasks) {this.attachData = tasks;}public void run() {System.out.println("開始執(zhí)行任務(wù):" + attachData);attachData = null;}public Object getTask() {return this.attachData;} }?
執(zhí)行結(jié)果:
????????????? ?創(chuàng)建任務(wù)并提交到線程池中:task@ 1
開始執(zhí)行任務(wù):task@ 1
創(chuàng)建任務(wù)并提交到線程池中:task@ 2
開始執(zhí)行任務(wù):task@ 2
創(chuàng)建任務(wù)并提交到線程池中:task@ 3
創(chuàng)建任務(wù)并提交到線程池中:task@ 4
開始執(zhí)行任務(wù):task@ 3
創(chuàng)建任務(wù)并提交到線程池中:task@ 5
開始執(zhí)行任務(wù):task@ 4
創(chuàng)建任務(wù)并提交到線程池中:task@ 6
創(chuàng)建任務(wù)并提交到線程池中:task@ 7
創(chuàng)建任務(wù)并提交到線程池中:task@ 8
開始執(zhí)行任務(wù):task@ 5
開始執(zhí)行任務(wù):task@ 6
創(chuàng)建任務(wù)并提交到線程池中:task@ 9
開始執(zhí)行任務(wù):task@ 7
創(chuàng)建任務(wù)并提交到線程池中:task@ 10
開始執(zhí)行任務(wù):task@ 8
開始執(zhí)行任務(wù):task@ 9
開始執(zhí)行任務(wù):task@ 10
?
總結(jié)
以上是生活随笔為你收集整理的concurrent: ThreadPoolExecutor 用法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: concurrent: Callable
- 下一篇: JDK5--Annotation学习:基