日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ThreadPoolExecutor 八种拒绝策略,对的,不是4种

發(fā)布時(shí)間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ThreadPoolExecutor 八种拒绝策略,对的,不是4种 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)載自??ThreadPoolExecutor 八種拒絕策略,對(duì)的,不是4種

前言

談到 Java 的線程池最熟悉的莫過(guò)于 ExecutorService 接口了,jdk1.5 新增的 java.util.concurrent 包下的這個(gè) api,大大的簡(jiǎn)化了多線程代碼的開(kāi)發(fā)。而不論你用 FixedThreadPool 還是 CachedThreadPool 其背后實(shí)現(xiàn)都是ThreadPoolExecutor。ThreadPoolExecutor 是一個(gè)典型的緩存池化設(shè)計(jì)的產(chǎn)物,因?yàn)槌刈佑写笮?#xff0c;當(dāng)池子體積不夠承載時(shí),就涉及到拒絕策略。JDK 中已經(jīng)預(yù)設(shè)了 4 種線程池拒絕策略,下面結(jié)合場(chǎng)景詳細(xì)聊聊這些策略的使用場(chǎng)景,以及我們還能擴(kuò)展哪些拒絕策略。

池化設(shè)計(jì)思想

池話設(shè)計(jì)應(yīng)該不是一個(gè)新名詞。我們常見(jiàn)的如 Java 線程池、JDBC 連接池、Redis 連接池等就是這類設(shè)計(jì)的代表實(shí)現(xiàn)。這種設(shè)計(jì)會(huì)初始預(yù)設(shè)資源,解決的問(wèn)題就是抵消每次獲取資源的消耗,如創(chuàng)建線程的開(kāi)銷,獲取遠(yuǎn)程連接的開(kāi)銷等。就好比你去食堂打飯,打飯的大媽會(huì)先把飯盛好幾份放那里,你來(lái)了就直接拿著飯盒加菜即可,不用再臨時(shí)又盛飯又打菜,效率就高了。除了初始化資源,池化設(shè)計(jì)還包括如下這些特征:池子的初始值、池子的活躍值、池子的最大值等,這些特征可以直接映射到 Java 線程池和數(shù)據(jù)庫(kù)連接池的成員屬性中。

線程池觸發(fā)拒絕策略的時(shí)機(jī)

和數(shù)據(jù)源連接池不一樣,線程池除了初始大小和池子最大值,還多了一個(gè)阻塞隊(duì)列來(lái)緩沖。數(shù)據(jù)源連接池一般請(qǐng)求的連接數(shù)超過(guò)連接池的最大值的時(shí)候就會(huì)觸發(fā)拒絕策略,策略一般是阻塞等待設(shè)置的時(shí)間或者直接拋異常。而線程池的觸發(fā)時(shí)機(jī)如下圖:

如圖,想要了解線程池什么時(shí)候觸發(fā)拒絕粗略,需要明確上面三個(gè)參數(shù)的具體含義,是這三個(gè)參數(shù)總體協(xié)調(diào)的結(jié)果,而不是簡(jiǎn)單的超過(guò)最大線程數(shù)就會(huì)觸發(fā)線程拒絕粗略,當(dāng)提交的任務(wù)數(shù)大于 corePoolSize 時(shí),會(huì)優(yōu)先放到隊(duì)列緩沖區(qū),只有填滿了緩沖區(qū)后,才會(huì)判斷當(dāng)前運(yùn)行的任務(wù)是否大于 maxPoolSize,小于時(shí)會(huì)新建線程處理。大于時(shí)就觸發(fā)了拒絕策略,總結(jié)就是:當(dāng)前提交任務(wù)數(shù)大于(maxPoolSize + queueCapacity)時(shí)就會(huì)觸發(fā)線程池的拒絕策略了。

?

JDK內(nèi)置4種線程池拒絕策略

拒絕策略接口定義

在分析 JDK 自帶的線程池拒絕策略前,先看下 JDK 定義的 拒絕策略接口,如下:

public?interface?RejectedExecutionHandler?{void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor); }

接口定義很明確,當(dāng)觸發(fā)拒絕策略時(shí),線程池會(huì)調(diào)用你設(shè)置的具體的策略,將當(dāng)前提交的任務(wù)以及線程池實(shí)例本身傳遞給你處理,具體作何處理,不同場(chǎng)景會(huì)有不同的考慮,下面看 JDK 為我們內(nèi)置了哪些實(shí)現(xiàn):

CallerRunsPolicy(調(diào)用者運(yùn)行策略)

????public?static?class?CallerRunsPolicy?implements?RejectedExecutionHandler?{public?CallerRunsPolicy()?{?}public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{if?(!e.isShutdown())?{r.run();}}}

功能:當(dāng)觸發(fā)拒絕策略時(shí),只要線程池沒(méi)有關(guān)閉,就由提交任務(wù)的當(dāng)前線程處理。

使用場(chǎng)景:一般在不允許失敗的、對(duì)性能要求不高、并發(fā)量較小的場(chǎng)景下使用,因?yàn)榫€程池一般情況下不會(huì)關(guān)閉,也就是提交的任務(wù)一定會(huì)被運(yùn)行,但是由于是調(diào)用者線程自己執(zhí)行的,當(dāng)多次提交任務(wù)時(shí),就會(huì)阻塞后續(xù)任務(wù)執(zhí)行,性能和效率自然就慢了。

AbortPolicy(中止策略)

????public?static?class?AbortPolicy?implements?RejectedExecutionHandler?{public?AbortPolicy()?{?}public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{throw?new?RejectedExecutionException("Task?"?+?r.toString()?+"?rejected?from?"?+e.toString());}}

功能:當(dāng)觸發(fā)拒絕策略時(shí),直接拋出拒絕執(zhí)行的異常,中止策略的意思也就是打斷當(dāng)前執(zhí)行流程

使用場(chǎng)景:這個(gè)就沒(méi)有特殊的場(chǎng)景了,但是一點(diǎn)要正確處理拋出的異常。ThreadPoolExecutor 中默認(rèn)的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 因?yàn)槎紱](méi)有顯示的設(shè)置拒絕策略,所以默認(rèn)的都是這個(gè)。但是請(qǐng)注意,ExecutorService 中的線程池實(shí)例隊(duì)列都是無(wú)界的,也就是說(shuō)把內(nèi)存撐爆了都不會(huì)觸發(fā)拒絕策略。當(dāng)自己自定義線程池實(shí)例時(shí),使用這個(gè)策略一定要處理好觸發(fā)策略時(shí)拋的異常,因?yàn)樗麜?huì)打斷當(dāng)前的執(zhí)行流程。

DiscardPolicy(丟棄策略)

????public?static?class?DiscardPolicy?implements?RejectedExecutionHandler?{public?DiscardPolicy()?{?}public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{}}

功能:直接靜悄悄的丟棄這個(gè)任務(wù),不觸發(fā)任何動(dòng)作

使用場(chǎng)景:如果你提交的任務(wù)無(wú)關(guān)緊要,你就可以使用它 。因?yàn)樗褪莻€(gè)空實(shí)現(xiàn),會(huì)悄無(wú)聲息的吞噬你的的任務(wù)。所以這個(gè)策略基本上不用了

DiscardOldestPolicy(棄老策略)

????public?static?class?DiscardOldestPolicy?implements?RejectedExecutionHandler?{public?DiscardOldestPolicy()?{?}public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{if?(!e.isShutdown())?{e.getQueue().poll();e.execute(r);}}}

功能:如果線程池未關(guān)閉,就彈出隊(duì)列頭部的元素,然后嘗試執(zhí)行

使用場(chǎng)景:這個(gè)策略還是會(huì)丟棄任務(wù),丟棄時(shí)也是毫無(wú)聲息,但是特點(diǎn)是丟棄的是老的未執(zhí)行的任務(wù),而且是待執(zhí)行優(yōu)先級(jí)較高的任務(wù)。基于這個(gè)特性,我能想到的場(chǎng)景就是,發(fā)布消息,和修改消息,當(dāng)消息發(fā)布出去后,還未執(zhí)行,此時(shí)更新的消息又來(lái)了,這個(gè)時(shí)候未執(zhí)行的消息的版本比現(xiàn)在提交的消息版本要低就可以被丟棄了。因?yàn)殛?duì)列中還有可能存在消息版本更低的消息會(huì)排隊(duì)執(zhí)行,所以在真正處理消息的時(shí)候一定要做好消息的版本比較

第三方實(shí)現(xiàn)的拒絕策略

Dubbo 中的線程拒絕策略

public?class?AbortPolicyWithReport?extends?ThreadPoolExecutor.AbortPolicy?{protected?static?final?Logger?logger?=?LoggerFactory.getLogger(AbortPolicyWithReport.class);private?final?String?threadName;private?final?URL?url;private?static?volatile?long?lastPrintTime?=?0;private?static?Semaphore?guard?=?new?Semaphore(1);public?AbortPolicyWithReport(String?threadName,?URL?url)?{this.threadName?=?threadName;this.url?=?url;}@Overridepublic?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?e)?{String?msg?=?String.format("Thread?pool?is?EXHAUSTED!"?+"?Thread?Name:?%s,?Pool?Size:?%d?(active:?%d,?core:?%d,?max:?%d,?largest:?%d),?Task:?%d?(completed:?%d),"?+"?Executor?status:(isShutdown:%s,?isTerminated:%s,?isTerminating:%s),?in?%s://%s:%d!",threadName,?e.getPoolSize(),?e.getActiveCount(),?e.getCorePoolSize(),?e.getMaximumPoolSize(),?e.getLargestPoolSize(),e.getTaskCount(),?e.getCompletedTaskCount(),?e.isShutdown(),?e.isTerminated(),?e.isTerminating(),url.getProtocol(),?url.getIp(),?url.getPort());logger.warn(msg);dumpJStack();throw?new?RejectedExecutionException(msg);}private?void?dumpJStack()?{//省略實(shí)現(xiàn)} }

可以看到,當(dāng)dubbo的工作線程觸發(fā)了線程拒絕后,主要做了三個(gè)事情,原則就是盡量讓使用者清楚觸發(fā)線程拒絕策略的真實(shí)原因

  • 輸出了一條警告級(jí)別的日志,日志內(nèi)容為線程池的詳細(xì)設(shè)置參數(shù),以及線程池當(dāng)前的狀態(tài),還有當(dāng)前拒絕任務(wù)的一些詳細(xì)信息。可以說(shuō),這條日志,使用dubbo的有過(guò)生產(chǎn)運(yùn)維經(jīng)驗(yàn)的或多或少是見(jiàn)過(guò)的,這個(gè)日志簡(jiǎn)直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細(xì)的日志,可以很容易定位到問(wèn)題所在

  • 輸出當(dāng)前線程堆棧詳情,這個(gè)太有用了,當(dāng)你通過(guò)上面的日志信息還不能定位問(wèn)題時(shí),案發(fā)現(xiàn)場(chǎng)的dump線程上下文信息就是你發(fā)現(xiàn)問(wèn)題的救命稻草,這個(gè)可以參考《dubbo線程池耗盡事件-"CyclicBarrier惹的禍"》

  • 繼續(xù)拋出拒絕執(zhí)行異常,使本次任務(wù)失敗,這個(gè)繼承了JDK默認(rèn)拒絕策略的特性

Netty 中的線程池拒絕策略

????private?static?final?class?NewThreadRunsPolicy?implements?RejectedExecutionHandler?{NewThreadRunsPolicy()?{super();}public?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor)?{try?{final?Thread?t?=?new?Thread(r,?"Temporary?task?executor");t.start();}?catch?(Throwable?e)?{throw?new?RejectedExecutionException("Failed?to?start?a?new?thread",?e);}}}

Netty 中的實(shí)現(xiàn)很像 JDK 中的 CallerRunsPolicy,舍不得丟棄任務(wù)。不同的是,CallerRunsPolicy 是直接在調(diào)用者線程執(zhí)行的任務(wù)。而 Netty是新建了一個(gè)線程來(lái)處理的。所以,Netty的實(shí)現(xiàn)相較于調(diào)用者執(zhí)行策略的使用面就可以擴(kuò)展到支持高效率高性能的場(chǎng)景了。但是也要注意一點(diǎn),Netty的實(shí)現(xiàn)里,在創(chuàng)建線程時(shí)未做任何的判斷約束,也就是說(shuō)只要系統(tǒng)還有資源就會(huì)創(chuàng)建新的線程來(lái)處理,直到new不出新的線程了,才會(huì)拋創(chuàng)建線程失敗的異常

ActiveMQ 中的線程池拒絕策略

?new?RejectedExecutionHandler()?{@Overridepublic?void?rejectedExecution(final?Runnable?r,?final?ThreadPoolExecutor?executor)?{try?{executor.getQueue().offer(r,?60,?TimeUnit.SECONDS);}?catch?(InterruptedException?e)?{throw?new?RejectedExecutionException("Interrupted?waiting?for?BrokerService.worker");}throw?new?RejectedExecutionException("Timed?Out?while?attempting?to?enqueue?Task.");}});

activeMq中的策略屬于最大努力執(zhí)行任務(wù)型,當(dāng)觸發(fā)拒絕策略時(shí),在嘗試一分鐘的時(shí)間重新將任務(wù)塞進(jìn)任務(wù)隊(duì)列,當(dāng)一分鐘超時(shí)還沒(méi)成功時(shí),就拋出異常

PinPoint 中的線程池拒絕策略

public?class?RejectedExecutionHandlerChain?implements?RejectedExecutionHandler?{private?final?RejectedExecutionHandler[]?handlerChain;public?static?RejectedExecutionHandler?build(List<RejectedExecutionHandler>?chain)?{Objects.requireNonNull(chain,?"handlerChain?must?not?be?null");RejectedExecutionHandler[]?handlerChain?=?chain.toArray(new?RejectedExecutionHandler[0]);return?new?RejectedExecutionHandlerChain(handlerChain);}private?RejectedExecutionHandlerChain(RejectedExecutionHandler[]?handlerChain)?{this.handlerChain?=?Objects.requireNonNull(handlerChain,?"handlerChain?must?not?be?null");}@Overridepublic?void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor)?{for?(RejectedExecutionHandler?rejectedExecutionHandler?:?handlerChain)?{rejectedExecutionHandler.rejectedExecution(r,?executor);}} }

pinpoint的拒絕策略實(shí)現(xiàn)很有特點(diǎn),和其他的實(shí)現(xiàn)都不同。他定義了一個(gè)拒絕策略鏈,包裝了一個(gè)拒絕策略列表,當(dāng)觸發(fā)拒絕策略時(shí),會(huì)將策略鏈中的rejectedExecution依次執(zhí)行一遍。

結(jié)語(yǔ)

前文從線程池設(shè)計(jì)思想,以及線程池觸發(fā)拒絕策略的時(shí)機(jī)引出java線程池拒絕策略接口的定義。并輔以JDK內(nèi)置4種以及四個(gè)第三方開(kāi)源軟件的拒絕策略定義描述了線程池拒絕策略實(shí)現(xiàn)的各種思路和使用場(chǎng)景。希望閱讀此文后能讓你對(duì)java線程池拒絕策略有更加深刻的認(rèn)識(shí),能夠根據(jù)不同的使用場(chǎng)景更加靈活的應(yīng)用。

總結(jié)

以上是生活随笔為你收集整理的ThreadPoolExecutor 八种拒绝策略,对的,不是4种的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。