大数据处理系列之(一)Java线程池使用
前言:最近在做分布式海量數(shù)據(jù)處理項(xiàng)目,使用到了java的線程池,所以搜集了一些資料對(duì)它的使用做了一下總結(jié)和探究,
前面介紹的東西大多都是從網(wǎng)上搜集整理而來(lái)。文中最核心的東西在于后面兩節(jié)無(wú)界隊(duì)列線程池和有界隊(duì)列線程池的實(shí)例
使用以及線上問(wèn)題處理方案。????????????????????????????????
?
1.? 為什么要用線程池?
??????在Java中,如果每當(dāng)一個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程,開(kāi)銷是相當(dāng)大的。在實(shí)際使用中,每個(gè)請(qǐng)求創(chuàng)建新線程的服務(wù)器
在創(chuàng)建和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源,甚至可能要比花在實(shí)際處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源要多的多。除
了創(chuàng)建和銷毀線程的開(kāi)銷之外,活動(dòng)的線程也需要消耗系統(tǒng)資源。如果在一個(gè)JVM中創(chuàng)建太多的線程,可能會(huì)導(dǎo)致系統(tǒng)由于
過(guò)度消耗內(nèi)存或者“切換過(guò)度”而導(dǎo)致系統(tǒng)資源不足。為了防止資源不足,服務(wù)器應(yīng)用程序需要一些辦法來(lái)限制任何給定時(shí)刻
處理的請(qǐng)求數(shù)目,盡可能減少創(chuàng)建和銷毀線程的次數(shù),特別是一些資源耗費(fèi)比較大的線程的創(chuàng)建和銷毀,盡量利用已有對(duì)象
來(lái)進(jìn)行服務(wù),這就是“池化資源”技術(shù)產(chǎn)生的原因。
??? ?線程池主要用來(lái)解決線程生命周期開(kāi)銷問(wèn)題和資源不足問(wèn)題,通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷被分?jǐn)偟蕉鄠€(gè)任
務(wù)上了,而且由于在請(qǐng)求到達(dá)時(shí)線程已經(jīng)存在,所以消除了創(chuàng)建所帶來(lái)的延遲。這樣,就可以立即請(qǐng)求服務(wù),使應(yīng)用程序響
應(yīng)更快。另外,通過(guò)適當(dāng)?shù)恼{(diào)整線程池中的線程數(shù)據(jù)可以防止出現(xiàn)資源不足的情況。
????? 網(wǎng)上找來(lái)的這段話,清晰的描述了為什么要使用線程池,使用線程池有哪些好處。工程項(xiàng)目中使用線程池的場(chǎng)景比比皆是。
本文關(guān)注的重點(diǎn)是如何在實(shí)戰(zhàn)中來(lái)使用好線程池這一技術(shù),來(lái)滿足海量數(shù)據(jù)大并發(fā)用戶請(qǐng)求的場(chǎng)景。
?
2. ThreadPoolExecutor類
?????? Java中的線程池技術(shù)主要用的是ThreadPoolExecutor 這個(gè)類。先來(lái)看這個(gè)類的構(gòu)造函數(shù),
ThreadPoolExecutor(int?corePoolSize, int?maximumPoolSize, long?keepAliveTime, TimeUnit?unit,
BlockingQueue<Runnable>?workQueue, ThreadFactory?threadFactory, RejectedExecutionHandler?handler)?
??? corePoolSize? ?????線程池維護(hù)線程的最少數(shù)量
??? maximumPoolSize ???線程池維護(hù)線程的最大數(shù)量?
??? keepAliveTime????? 線程池維護(hù)線程所允許的空閑時(shí)間??
??? workQueue? ????????任務(wù)隊(duì)列,用來(lái)存放我們所定義的任務(wù)處理線程
??? threadFactory ?????線程創(chuàng)建工廠
??? handler??????????? 線程池對(duì)拒絕任務(wù)的處理策略
??? ?ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize?設(shè)置的邊界自動(dòng)調(diào)整池大小。當(dāng)新任務(wù)在方法
execute(Runnable) 中提交時(shí), 如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來(lái)處理請(qǐng)求,即使其他輔助線程是
空閑的。如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程。 如果設(shè)置的
corePoolSize 和 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。
???? ThreadPoolExecutor是Executors類的實(shí)現(xiàn),Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池,主
要有以下幾個(gè):
???? newSingleThreadExecutor:創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行
所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任
務(wù)的提交順序執(zhí)行。 ?
???? newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線
程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。
???? newCachedThreadPool:創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分
空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池
大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小。
????? 在實(shí)際的項(xiàng)目中,我們會(huì)使用得到比較多的是newFixedThreadPool,創(chuàng)建固定大小的線程池,但是這個(gè)方法在真實(shí)的線上
環(huán)境中還是會(huì)有很多問(wèn)題,這個(gè)將會(huì)在下面一節(jié)中詳細(xì)講到。
????? 當(dāng)任務(wù)源源不斷的過(guò)來(lái),而我們的系統(tǒng)又處理不過(guò)來(lái)的時(shí)候,我們要采取的策略是拒絕服務(wù)。RejectedExecutionHandler接
口提供了拒絕任務(wù)處理的自定義方法的機(jī)會(huì)。在ThreadPoolExecutor中已經(jīng)包含四種處理策略。
????? 1)CallerRunsPolicy:線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
???????? ?public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
???????????? if (!e.isShutdown()) {
??????????????? ?r.run();
??????????? }
??????? }
這個(gè)策略顯然不想放棄執(zhí)行任務(wù)。但是由于池中已經(jīng)沒(méi)有任何資源了,那么就直接使用調(diào)用該execute的線程本身來(lái)執(zhí)行。
???? 2)AbortPolicy:處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException
??????? ?public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
??????????? ? throw new RejectedExecutionException();
??????? }
?這種策略直接拋出異常,丟棄任務(wù)。
????? 3)DiscardPolicy:不能執(zhí)行的任務(wù)將被刪除
??????????public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
?? 這種策略和AbortPolicy幾乎一樣,也是丟棄任務(wù),只不過(guò)他不拋出異常。
???? 4)DiscardOldestPolicy:如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,
則重復(fù)此過(guò)程)
??????? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
??????????? if (!e.isShutdown()) {
??????????????? e.getQueue().poll();
??????????????? e.execute(r);
??????????? }
??????? }
????? 該策略就稍微復(fù)雜一些,在pool沒(méi)有關(guān)閉的前提下首先丟掉緩存在隊(duì)列中的最早的任務(wù),然后重新嘗試運(yùn)行該任務(wù)。這個(gè)策略
需要適當(dāng)小心。
?
3.? ThreadPoolExecutor無(wú)界隊(duì)列使用
?? public class ThreadPool {
??????? private final static String poolName = "mypool";
??????? static private ThreadPool threadFixedPool = new ThreadPool(2);
?????? private ExecutorService executor;
????? static public ThreadPool getFixedInstance() {
?????????? return threadFixedPool;
?????? }
??? private ThreadPool(int num) {
?????????? executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));
}
public void execute(Runnable r) {
?????????? executor.execute(r);
}
public static void main(String[] params) {
?????????? class MyRunnable implements Runnable {
??????????????????? public void run() {
???????????????????????????? System.out.println("OK!");
???????????????????????????? try {
?????????????????????????????????????? Thread.sleep(10);
???????????????????????????? } catch (InterruptedException e) {
?????????????????????????????????????? e.printStackTrace();
???????????????????????????? }
??????????????????? }
?????????? }
?????????? for (int i = 0; i < 10; i++) {
???????????? ThreadPool.getFixedInstance().execute(new MyRunnable());
?????????? }
?????????? try {
??????????????????? Thread.sleep(2000);
??????????????????? System.out.println("Process end.");
?????????? } catch (InterruptedException e) {
??????????????????? e.printStackTrace();
?????????? }
}
}
?????? 在這段代碼中,我們發(fā)現(xiàn)我們用到了Executors.newFixedThreadPool()函數(shù),這個(gè)函數(shù)的實(shí)現(xiàn)是這樣子的:
return?new?ThreadPoolExecutor(nThreads,?nThreads,?0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>());?
?????? 它實(shí)際上是創(chuàng)建了一個(gè)無(wú)界隊(duì)列的固定大小的線程池。執(zhí)行這段代碼,我們發(fā)現(xiàn)所有的任務(wù)都正常處理了。但是在真實(shí)的線上環(huán)
境中會(huì)存在這樣的一個(gè)問(wèn)題,前端的用戶請(qǐng)求源源不斷的過(guò)來(lái),后端的處理線程如果處理時(shí)間變長(zhǎng),無(wú)法快速的將用戶請(qǐng)求處理
完返回結(jié)果給前端,那么任務(wù)隊(duì)列中將堵塞大量的請(qǐng)求。這些請(qǐng)求在前端都是有超時(shí)時(shí)間設(shè)置的,假設(shè)請(qǐng)求是通過(guò)套接字過(guò)來(lái),
當(dāng)我們的后端處理進(jìn)程處理完一個(gè)請(qǐng)求后,從隊(duì)列中拿下一個(gè)任務(wù),發(fā)現(xiàn)這個(gè)任務(wù)的套接字已經(jīng)無(wú)效了,這是因?yàn)樵谟脩舳艘呀?jīng)
超時(shí),將套接字建立的連接關(guān)閉了。這樣一來(lái)我們這邊的處理程序再去讀取套接字時(shí),就會(huì)發(fā)生I/0 Exception. 惡性循環(huán),導(dǎo)致我
們所有的處理服務(wù)線程讀的都是超時(shí)的套接字,所有的請(qǐng)求過(guò)來(lái)都拋I/O異常,這樣等于我們整個(gè)系統(tǒng)都掛掉了,已經(jīng)無(wú)法對(duì)外提供
正常的服務(wù)了。
???? 對(duì)于海量數(shù)據(jù)的處理,現(xiàn)在業(yè)界都是采用集群系統(tǒng)來(lái)進(jìn)行處理,當(dāng)請(qǐng)求的數(shù)量不斷加大的時(shí)候,我們可以通過(guò)增加處理節(jié)點(diǎn),反正現(xiàn)
在硬件設(shè)備相對(duì)便宜。但是要保證系統(tǒng)的可靠性和穩(wěn)定性,在程序方面我們還是可以進(jìn)一步的優(yōu)化的,我們下一節(jié)要講述的就是針對(duì)
線上出現(xiàn)的這類問(wèn)題的一種處理策略。
?
4.???ThreadPoolExecutor有界隊(duì)列使用
public class ThreadPool {
???????? private final static String poolName = "mypool";
???????? static private ThreadPool threadFixedPool = null;
???????? public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
???????? private ExecutorService executor;
?
???????? static public ThreadPool getFixedInstance() {
?????????????????? return threadFixedPool;
???????? }
???????? private ThreadPool(int num) {
?????????????????? executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory
(poolName), new ThreadPoolExecutor.AbortPolicy());
???????? }
???????? public void execute(Runnable r) {
?????????????????? executor.execute(r);
???????? }
????????
???????? public static void main(String[] params) {
?????????????????? class MyRunnable implements Runnable {
??????????????????????????? public void run() {
???????????????????????????????????? System.out.println("OK!");
???????????????????????????????????? try {
?????????????????????????????????????????????? Thread.sleep(10);
???????????????????????????????????? } catch (InterruptedException e) {
?????????????????????????????????????????????? e.printStackTrace();
???????????????????????????????????? }
??????????????????????????? }
?????????????????? }
?????????????????? int count = 0;
?????????????????? for (int i = 0; i < 10; i++) {
??????????????????????????? try {
???????????????????????????????????? ThreadPool.getFixedInstance().execute(new MyRunnable());
??????????????????????????? } catch (RejectedExecutionException e) {
???????????????????????????????????? e.printStackTrace();
???????????????????????????????????? count++;
??????????????????????????? }
?????????????????? }
?????????????????? try {
??????????????????????????? log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());
??????????????????????????? Thread.sleep(2000);
?????????????????? } catch (InterruptedException e) {
??????????????????????????? e.printStackTrace();
?????????????????? }
?????????????????? System.out.println("Reject task: " + count);
???????? }
}
?????? 首先我們來(lái)看下這段代碼幾個(gè)重要的參數(shù),corePoolSize 為2,maximumPoolSize為4,任務(wù)隊(duì)列大小為2,每個(gè)任務(wù)平
均處理時(shí)間為10ms,一共有10個(gè)并發(fā)任務(wù)。
????? 執(zhí)行這段代碼,我們會(huì)發(fā)現(xiàn),有4個(gè)任務(wù)失敗了。這里就驗(yàn)證了我們?cè)谏厦嫣岬接薪珀?duì)列時(shí)候線程池的執(zhí)行順序。當(dāng)新任務(wù)在
方法 execute(Runnable) 中提交時(shí), 如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來(lái)處理請(qǐng)求。 如果運(yùn)行的線程多于
corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程,如果此時(shí)線程數(shù)量達(dá)到maximumPoolSize,并且隊(duì)
列已經(jīng)滿,就會(huì)拒絕繼續(xù)進(jìn)來(lái)的請(qǐng)求。
??? 現(xiàn)在我們調(diào)整一下代碼中的幾個(gè)參數(shù),將并發(fā)任務(wù)數(shù)改為200,執(zhí)行結(jié)果Reject task: 182,說(shuō)明有18個(gè)任務(wù)成功了,線程處理
完一個(gè)請(qǐng)求后會(huì)接著去處理下一個(gè)過(guò)來(lái)的請(qǐng)求。在真實(shí)的線上環(huán)境中,會(huì)源源不斷的有新的請(qǐng)求過(guò)來(lái),當(dāng)前的被拒絕了,但只要線
程池線程把當(dāng)下的任務(wù)處理完之后還是可以處理下一個(gè)發(fā)送過(guò)來(lái)的請(qǐng)求。
???? 通過(guò)有界隊(duì)列可以實(shí)現(xiàn)系統(tǒng)的過(guò)載保護(hù),在高壓的情況下,我們的系統(tǒng)處理能力不會(huì)變?yōu)?,還能正常對(duì)外進(jìn)行服務(wù),雖然有些服
務(wù)可能會(huì)被拒絕,至于如何減少被拒絕的數(shù)量以及對(duì)拒絕的請(qǐng)求采取何種處理策略我將會(huì)在下一篇文章《系統(tǒng)的過(guò)載保護(hù)》中繼續(xù)
闡述。
?
參考文獻(xiàn):
轉(zhuǎn)載于:https://www.cnblogs.com/cstar/archive/2012/06/14/2549494.html
總結(jié)
以上是生活随笔為你收集整理的大数据处理系列之(一)Java线程池使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: DataGridView数据验证Cell
- 下一篇: PO接收产生分录