Java — 慎用Executors类中newFixedThreadPool()和newCachedThreadPool()
文章目錄
- Executors.newCachedThreadPool()
- 源碼
- 分析
- Executors.newFixedThreadPool()
- 源碼
- 分析
- 避坑指南
- 自定義線(xiàn)程池
在一些要求嚴(yán)格的公司,一般都明令禁止是使用Excutor提供的newFixedThreadPool()和newCachedThreadPool()直接創(chuàng)建線(xiàn)程池來(lái)操作線(xiàn)程,既然被禁止,那么就會(huì)有被禁止的道理,我們先來(lái)看一下之所以會(huì)被禁止的原因。
Executors.newCachedThreadPool()
源碼
/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available. These pools will typically improve the performance* of programs that execute many short-lived asynchronous tasks.* Calls to {@code execute} will reuse previously constructed* threads if available. If no existing thread is available, a new* thread will be created and added to the pool. Threads that have* not been used for sixty seconds are terminated and removed from* the cache. Thus, a pool that remains idle for long enough will* not consume any resources. Note that pools with similar* properties but different details (for example, timeout parameters)* may be created using {@link ThreadPoolExecutor} constructors.** @return the newly created thread pool*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}大致意思就是,通過(guò)該方法會(huì)創(chuàng)建一個(gè)線(xiàn)程池,當(dāng)你執(zhí)行一個(gè)任務(wù),并且線(xiàn)程池中不存在可用的已構(gòu)造好的線(xiàn)程時(shí),它就會(huì)創(chuàng)建一個(gè)新線(xiàn)程,否則它會(huì)優(yōu)先復(fù)用已有的線(xiàn)程,當(dāng)線(xiàn)程未被使用時(shí),默認(rèn) 6 秒后被移除。
里面有一句話(huà):
These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
這些線(xiàn)程池可以很明顯的提升那些短期存活的異步任務(wù)的執(zhí)行效率
很明顯,官方標(biāo)注他適合處理業(yè)務(wù)簡(jiǎn)單、耗時(shí)短的任務(wù),這是為什么呢?
我們接著看 ThreadPoolExecutor 構(gòu)造方法的描述:
分析
結(jié)合ThreadPoolExecutor 構(gòu)造方法的描述,我們可以知道,當(dāng)我們調(diào)用newCachedThreadPool()方法的時(shí)候,它會(huì)創(chuàng)建一個(gè)核心線(xiàn)程數(shù)為 0 ,最大線(xiàn)程數(shù)為Integer上限,無(wú)用線(xiàn)程存活時(shí)間為 6 秒的線(xiàn)程池。
這意味著當(dāng)我們需要在多線(xiàn)程中執(zhí)行復(fù)雜業(yè)務(wù)時(shí),它會(huì)瘋狂的創(chuàng)建線(xiàn)程,因?yàn)槠渌€(xiàn)程中的業(yè)務(wù)并未執(zhí)行完。
例如下列代碼:
模擬瞬間創(chuàng)建100000000十萬(wàn)個(gè)任務(wù),且每個(gè)任務(wù)需要等待一秒鐘,會(huì)發(fā)現(xiàn)電腦內(nèi)存使用率迅速增加并一直持續(xù)到 OOM。
Executors.newFixedThreadPool()
我們?cè)賮?lái)看一下源碼
源碼
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/ public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }大致描述是:創(chuàng)建一個(gè)由固定數(shù)量線(xiàn)程并在共享無(wú)界隊(duì)列上運(yùn)行的線(xiàn)程池,在任何時(shí)候都最多只有nThreads個(gè)線(xiàn)程存在并執(zhí)行任務(wù)。
如果在任務(wù)提交時(shí),所有線(xiàn)程都在工作中,則會(huì)將該任務(wù)放入到隊(duì)列中等待,直到有可用的線(xiàn)程。如果某個(gè)線(xiàn)程在執(zhí)行過(guò)程中出現(xiàn)異常,那么這個(gè)線(xiàn)程會(huì)終止,并且會(huì)有一個(gè)新的線(xiàn)程代替它進(jìn)行后續(xù)的工作,線(xiàn)程池中的線(xiàn)程會(huì)一直存在直到線(xiàn)程池被明確的停止掉。
分析
通過(guò)源碼我們可以看出,該方法創(chuàng)建一個(gè)固定核心線(xiàn)程數(shù)和線(xiàn)程池大小的線(xiàn)程池,并且核心數(shù)等于最大線(xiàn)程數(shù)。看起來(lái)好像沒(méi)有類(lèi)似newCachedThreadPool無(wú)限創(chuàng)建線(xiàn)程的情況,但是在他的描述中有一點(diǎn)很引人注意,
operating off a shared unbounded queue
操作一個(gè)共享無(wú)界的隊(duì)列
通過(guò)查看newFixedThreadPool()在創(chuàng)建線(xiàn)程池時(shí)傳入的隊(duì)列 new LinkedBlockingQueue()
會(huì)發(fā)現(xiàn),這個(gè)隊(duì)列的最大長(zhǎng)度時(shí)Integer.MAX_VALUE,這就意味著,未能及時(shí)執(zhí)行的任務(wù)都將添加到這個(gè)隊(duì)列里面
隨著任務(wù)的增加,這個(gè)隊(duì)列所占用的內(nèi)存將越來(lái)越多。最終導(dǎo)致OOM也是遲早的事情。
避坑指南
對(duì)于線(xiàn)程池這種東西,其實(shí)讓我們自己去控制是最好的,我們可以通過(guò)實(shí)現(xiàn)自定義的線(xiàn)程池提供線(xiàn)程,不僅可以定制化的獲取線(xiàn)程執(zhí)行過(guò)程中的狀態(tài)等信息,還能根據(jù)不同的任務(wù)使用不同的線(xiàn)程池。
例如,一條簡(jiǎn)單的 查詢(xún)操作 和 文件讀取操作 就應(yīng)該放在不同的線(xiàn)程池里面
因?yàn)槿绻麅煞N任務(wù)在同一個(gè)線(xiàn)程池里面,文件操作本身就是耗時(shí)的,它占用了線(xiàn)程之后會(huì)導(dǎo)致查詢(xún)操作等待或者直接被丟棄(取決于自定義線(xiàn)程池任務(wù)添加時(shí)的規(guī)則),這樣嚴(yán)重影響了查詢(xún)性能。
自定義線(xiàn)程池
//隊(duì)列長(zhǎng)度為100BlockingQueue<Runnable> blockqueue = new LinkedBlockingQueue<Runnable>(100) {/*** 這里重寫(xiě)offer方法* 在接收到新的任務(wù)時(shí),會(huì)先加入到隊(duì)列中,當(dāng)隊(duì)列滿(mǎn)了之后,才會(huì)創(chuàng)建新的線(xiàn)程 直到達(dá)到線(xiàn)程池的最大線(xiàn)程數(shù)* 我們現(xiàn)在需要接收到新任務(wù)時(shí),優(yōu)先將線(xiàn)程數(shù)擴(kuò)容到最大數(shù),后續(xù)任務(wù)再放入到隊(duì)列中* 加入隊(duì)列會(huì)調(diào)用 offer方法 ,我們直接返回false,制造隊(duì)列已滿(mǎn)的假象*/@Overridepublic boolean offer(Runnable e) {return false;}};ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10,10, TimeUnit.SECONDS,blockqueue , new ThreadFactoryBuilder().setNameFormat("mypool-%d").get(), (r, executor) -> {/*** 這里拒絕策略,被拒絕的任務(wù)會(huì)走該方法 及沒(méi)添加到隊(duì)列中,且沒(méi)有獲取到線(xiàn)程的任務(wù)* 因?yàn)槲覀冊(cè)O(shè)置的隊(duì)列中 offer方法固定返回false*/try {//如果允許該任務(wù)執(zhí)行但是不阻塞,及如果進(jìn)不了隊(duì)列就放棄,我們可以調(diào)用 offer 的另一個(gè)多參的方法if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());}//如果我們需要讓任務(wù)一定要執(zhí)行,及足協(xié)而等待進(jìn)入隊(duì)列,可以使用putexecutor.getQueue().put(r)} catch (InterruptedException e) {Thread.currentThread().interrupt();}});總結(jié)
以上是生活随笔為你收集整理的Java — 慎用Executors类中newFixedThreadPool()和newCachedThreadPool()的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 一个安全的邮件习惯如何练成的
- 下一篇: 第八届蓝桥杯Java A组决赛第一题