可动态调节参数的线程池实现
背景
線程池是一種基于池化思想管理線程的工具,使用線程池可以減少創建銷毀線程的開銷,避免線程過多導致系統資源耗盡。在高并發的任務處理場景,線程池的使用是必不可少的。在雙11主圖價格表達項目中為了提升處理性能,很多地方使用到了線程池。隨著線程池的使用,逐漸發現一個問題,線程池的參數如何設置?
線程池參數中有三個比較關鍵的參數,分別是corePoolSize(核心線程數)、maximumPoolSize(最大線程數)、workQueueSzie(工作隊列大小)。根據任務的類型可以區分為IO密集型和CPU密集型,對于CPU密集型,一般經驗是設置corePoolSize=CPU核數+1,對于IO密集型需要根據具體的RT和流量來設置,沒有普適的經驗值。然而,我們一般遇到的情況多數是處理IO密集型任務,如果線程池參數不可動態調節,就沒辦法根據實際情況實時調整處理速度,只能通過發布代碼調整參數。
如果線程池參數不合理會導致什么問題呢?下面列舉幾種可能出現的場景:
最大線程數設置偏小,工作隊列大小設置偏小,導致服務接口大量拋出RejectedExecutionException。
最大線程數設置偏小,工作隊列大小設置過大,任務堆積過度,接口響應時長變長。
最大線程數設置過大,線程調度開銷增大,處理速度反而下降。
核心線程數設置過小,流量突增時需要先創建線程,導致響應時長過大。
核心線程數設置過大,空閑線程太多,占用系統資源。
線程池任務調度機制
要明白線程池參數對運行時的影響,就必須理解其中的原理,所以下面先簡單總結了線程池的核心原理。
Java中的線程池核心實現類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合從而執行并行任務。用戶無需關注如何創建線程,如何調度線程來執行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執行器(Executor)中,由Executor框架完成線程的調配和任務的執行部分。
ThreadPoolExecutor是如何運行,如何同時維護線程和執行任務的呢?其運行機制如下圖所示:
所有任務的調度都是由execute方法完成的,這部分完成的工作是:檢查現在線程池的運行狀態、運行線程數、運行策略,決定接下來執行的流程,是直接申請線程執行,或是緩沖到隊列中執行,亦或是直接拒絕該任務。其執行過程如下:
首先檢測線程池運行狀態,如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態下執行任務。
如果workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任務。
如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新提交的任務。
如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
其執行流程如下圖所示:
動態調節線程池參數實現
線程池相關的重要參數有三個,分別是核心線程數、最大線程數和工作隊列大小,接下來將闡述如何實現動態調節線程池參數。
調節核心和最大線程數的原理
ThreadPoolExecutor已經提供了兩個方法在運行時設置核心線程數和最大線程數,分別是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。
setCorePoolSize方法的執行流程是:首先會覆蓋之前構造函數設置的corePoolSize,然后,如果新的值比原始值要小,當多余的工作線程下次變成空閑狀態的時候會被中斷并銷毀,如果新的值比原來的值要大且工作隊列不為空,則會創建新的工作線程。流程圖如下:
setMaximumPoolSize方法執行流程是:首先會覆蓋之前構造函數設置的maximumPoolSize,然后,如果新的值比原來的值要小,當多余的工作線程下次變成空閑狀態的時候會被中斷并銷毀。
調節工作隊列大小的原理
線程池中是以生產者消費者模式,通過一個阻塞隊列來緩存任務,工作線程從阻塞隊列中獲取任務。工作隊列的接口是阻塞隊列(BlockingQueue),在隊列為空時,獲取元素的線程會等待隊列變為非空,當隊列滿時,存儲元素的線程會等待隊列可用。
目前JDK提供了以下阻塞隊列的實現:
但是很不幸,這些阻塞隊列的實現都不支持動態調整大小,那么為什么不自己實現一個可動態調整大小的阻塞隊列呢。重復造輪子是不可取的,所以我選擇改造輪子。LinkedBlockingQueue是比較常用的一個阻塞隊列,它無法修改大小的原因是capacity字段設置成了final?private final int capacity;。如果我把final去掉,并提供修改capacity的方法,是不是就滿足我們的需求呢?事實證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實現。
結合Diamond進行實現
Diamond可以管理我們的配置,如果可以通過Diamond實現線程池參數管理那就再好不過了。接下來就開始上代碼了,首先實現一個Diamond配置管理類DispatchConfig,然后,實現一個線程池管理的工廠方法StreamExecutorFactory。
DispatchConfig類是一個靜態類,在初始化的時候獲取了對應Diamond的內容并設置了監聽,使用的時候只需要DispatchConfig.getConfig().getCorePoolSize()。
/**
-
@author moda
*/
@Slf4j
@Data
public class DispatchConfig {
public static final String DATA_ID = “com.alibaba.mkt.turbo.DispatchConfig”;
public static final String GROUP_ID = “mkt-turbo”;
private static DispatchConfig config;static {
try {
String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
config = JSON.parseObject(content, DispatchConfig.class);
Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
config = JSON.parseObject(content, DispatchConfig.class);
} catch (Throwable t) {
log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
}
}
});
} catch (Exception e) {
log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
}
}public static DispatchConfig getConfig() {
return config;
}private int corePoolSize = 10;
private int maximumPoolSize = 30;
private int workQueueSize = 1024;
/**
- 商品分批處理每批大小
*/
private int itemBatchProcessPageSize = 200;
}
StreamExecutorFactory是一個靜態類,維護了一個靜態屬性executor,并通過initExecutor()進行初始化。在初始化的時候,工作隊列使用了可調節大小的阻塞隊列ResizeLinkedBlockingQueue,并設置了監聽Diamond變更。Diamond發生變更的時候通過在callback中對比值是否發生改變,如果發生改變則調整workQueueSize、corePoolSize、maximumPoolSize。使用的時候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能動態修改線程池參數。
- 商品分批處理每批大小
/**
-
@author moda
*/
@Slf4j
public class StreamExecutorFactory {
private static final String THREAD_NAME = “mkt-turbo_stream_dispatch”;private static ThreadPoolExecutor executor = initExecutor();
private static ThreadPoolExecutor initExecutor() {
Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {@Overridepublic void receiveConfigInfo(String content) {try {DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);if (workQueue.getCapacity() != config.getWorkQueueSize()) {workQueue.setCapacity(config.getWorkQueueSize());}if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());}if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());}} catch (Throwable t) {log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);}}});return threadPoolExecutor;
ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
ResizeLinkedBlockingQueue workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
//拒絕策略,調用者線程處理
RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
r.run();
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
DispatchConfig.getConfig().getCorePoolSize(),
DispatchConfig.getConfig().getMaximumPoolSize(),
10,
TimeUnit.SECONDS,
workQueue,
nameThreadFactory,
rejectedExecutionHandler
);}
public static Executor getExecutor() {
return executor;
}
}
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的可动态调节参数的线程池实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 实时计算 Flink 版 最佳实践
- 下一篇: 消息队列RocketMQ性能测试案例