日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

写的很好!细数 Java 线程池的原理

發(fā)布時(shí)間:2025/3/21 java 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 写的很好!细数 Java 线程池的原理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

今天我們就來詳細(xì)講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實(shí)現(xiàn)原理,接著給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。

Java 中的 ThreadPoolExecutor 類

java.uitl.concurrent.ThreadPoolExecutor?類是線程池中最核心的一個(gè)類,因此如果要透徹地了解Java 中的線程池,必須先了解這個(gè)類。下面我們來看一下 ThreadPoolExecutor 類的具體實(shí)現(xiàn)源碼。

在 ThreadPoolExecutor 類中提供了四個(gè)構(gòu)造方法:

public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{.....public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,ThreadFactory?threadFactory);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,RejectedExecutionHandler?handler);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler);... }

從上面的代碼可以得知,ThreadPoolExecutor 繼承了?AbstractExecutorService?類,并提供了四個(gè)構(gòu)造器,事實(shí)上,通過觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。

下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:

  • corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()?或者?prestartCoreThread()方法,從這 2 個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建 corePoolSize 個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到 corePoolSize 后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;

  • maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;

  • keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于 corePoolSize 時(shí),keepAliveTime 才會(huì)起作用,直到線程池中的線程數(shù)不大于 corePoolSize,即當(dāng)線程池中的線程數(shù)大于 corePoolSize 時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到 keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過 corePoolSize。但是如果調(diào)用了?allowCoreThreadTimeOut(boolean)?方法,在線程池中的線程數(shù)不大于 corePoolSize 時(shí),keepAliveTime 參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;

  • unit:參數(shù) keepAliveTime 的時(shí)間單位,有 7 種取值,在?TimeUnit?類中有 7 種靜態(tài)屬性:

TimeUnit.DAYS;???????????????//天 TimeUnit.HOURS;?????????????//小時(shí) TimeUnit.MINUTES;???????????//分鐘 TimeUnit.SECONDS;???????????//秒 TimeUnit.MILLISECONDS;??????//毫秒 TimeUnit.MICROSECONDS;??????//微妙 TimeUnit.NANOSECONDS;???????//納秒
  • workQueue:一個(gè)阻塞隊(duì)列,用來存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊(duì)列有以下幾種選擇:

ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;

ArrayBlockingQueue 和 PriorityBlockingQueue 使用較少,一般使用 LinkedBlockingQueue 和 Synchronous。線程池的排隊(duì)策略與 BlockingQueue 有關(guān)。

  • threadFactory:線程工廠,主要用來創(chuàng)建線程;

  • handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。? ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。? ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)?

具體參數(shù)的配置與線程池的關(guān)系將在下一節(jié)講述。

從上面給出的?ThreadPoolExecutor?類的代碼可以知道,ThreadPoolExecutor 繼承了AbstractExecutorService,我們來看一下 AbstractExecutorService 的實(shí)現(xiàn):

public?abstract?class?AbstractExecutorService?implements?ExecutorService?{protected?<T>?RunnableFuture<T>?newTaskFor(Runnable?runnable,?T?value)?{?};protected?<T>?RunnableFuture<T>?newTaskFor(Callable<T>?callable)?{?};public?Future<?>?submit(Runnable?task)?{};public?<T>?Future<T>?submit(Runnable?task,?T?result)?{?};public?<T>?Future<T>?submit(Callable<T>?task)?{?};private?<T>?T?doInvokeAny(Collection<??extends?Callable<T>>?tasks,boolean?timed,?long?nanos)throws?InterruptedException,?ExecutionException,?TimeoutException?{};public?<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException,?ExecutionException?{};public?<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException,?ExecutionException,?TimeoutException?{};public?<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException?{};public?<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException?{}; }?

AbstractExecutorService 是一個(gè)抽象類,它實(shí)現(xiàn)了 ExecutorService 接口。

我們接著看 ExecutorService 接口的實(shí)現(xiàn):

public?interface?ExecutorService?extends?Executor?{void?shutdown();boolean?isShutdown();boolean?isTerminated();boolean?awaitTermination(long?timeout,?TimeUnit?unit)throws?InterruptedException;<T>?Future<T>?submit(Callable<T>?task);<T>?Future<T>?submit(Runnable?task,?T?result);Future<?>?submit(Runnable?task);<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException;<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException;<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException,?ExecutionException;<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException,?ExecutionException,?TimeoutException; }

而 ExecutorService 又是繼承了 Executor 接口,我們看一下 Executor 接口的實(shí)現(xiàn):

public?interface?Executor?{void?execute(Runnable?command); }

到這里,大家應(yīng)該明白了?ThreadPoolExecutor、AbstractExecutorService、ExecutorService 和 Executor幾個(gè)之間的關(guān)系了。

Executor 是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法?execute(Runnable),返回值為 void,參數(shù)為Runnable 類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;

然后 ExecutorService 接口繼承了 Executor 接口,并聲明了一些方法:submit、invokeAll、invokeAny 以及shutDown 等;

抽象類AbstractExecutorService實(shí)現(xiàn)了 ExecutorService 接口,基本實(shí)現(xiàn)了 ExecutorService 中聲明的所有方法;

然后ThreadPoolExecutor?繼承了類 AbstractExecutorService。

在 ThreadPoolExecutor 類中有幾個(gè)非常重要的方法:

execute() submit() shutdown() shutdownNow()

execute()?方法實(shí)際上是 Executor 中聲明的方法,在 ThreadPoolExecutor 進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor 的核心方法,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。

submit()方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor 中并沒有對(duì)其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和 execute() 方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看 submit() 方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的 execute() 方法,只不過它利用了?Future?來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。

shutdown()和shutdownNow()是用來關(guān)閉線程池的。

還有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount() 等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱 API。

深入剖析線程池實(shí)現(xiàn)原理

  在上一節(jié)我們從宏觀上介紹了 ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實(shí)現(xiàn)原理,將從下面幾個(gè)方面講解:

  1.線程池狀態(tài)

  2.任務(wù)的執(zhí)行

  3.線程池中的線程初始化

  4.任務(wù)緩存隊(duì)列及排隊(duì)策略

  5.任務(wù)拒絕策略

  6.線程池的關(guān)閉

  7.線程池容量的動(dòng)態(tài)調(diào)整

線程池狀態(tài)

在 ThreadPoolExecutor 中定義了一個(gè) volatile 變量,另外定義了幾個(gè) static final 變量表示線程池的各個(gè)狀態(tài):

volatile?int?runState; static?final?int?RUNNING????=?0; static?final?int?SHUTDOWN???=?1; static?final?int?STOP???????=?2; static?final?int?TERMINATED?=?3;

runState 表示當(dāng)前線程池的狀態(tài),它是一個(gè) volatile 變量用來保證線程之間的可見性;

下面的幾個(gè) static final 變量表示 runState 可能的幾個(gè)取值。

當(dāng)創(chuàng)建線程池后,初始時(shí),線程池處于?RUNNING?狀態(tài);

如果調(diào)用了 shutdown() 方法,則線程池處于?SHUTDOWN?狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢;

如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);

當(dāng)線程池處于 SHUTDOWN 或 STOP 狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。

任務(wù)的執(zhí)行

在了解將任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個(gè)過程之前,我們先來看一下 ThreadPoolExecutor 類中其他的一些比較重要成員變量:

private?final?BlockingQueue<Runnable>?workQueue;??????????????//任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù) private?final?ReentrantLock?mainLock?=?new?ReentrantLock();???//線程池的主要狀態(tài)鎖,對(duì)線程池狀態(tài)(比如線程池大小//、runState等)的改變都要使用這個(gè)鎖 private?final?HashSet<Worker>?workers?=?new?HashSet<Worker>();??//用來存放工作集private?volatile?long??keepAliveTime;????//線程存貨時(shí)間??? private?volatile?boolean?allowCoreThreadTimeOut;???//是否允許為核心線程設(shè)置存活時(shí)間 private?volatile?int???corePoolSize;?????//核心池的大小(即線程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會(huì)被放進(jìn)任務(wù)緩存隊(duì)列) private?volatile?int???maximumPoolSize;???//線程池最大能容忍的線程數(shù)private?volatile?int???poolSize;???????//線程池中當(dāng)前的線程數(shù)private?volatile?RejectedExecutionHandler?handler;?//任務(wù)拒絕策略private?volatile?ThreadFactory?threadFactory;???//線程工廠,用來創(chuàng)建線程private?int?largestPoolSize;???//用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)private?long?completedTaskCount;???//用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù)

每個(gè)變量的作用都已經(jīng)標(biāo)明出來了,這里要重點(diǎn)解釋一下 corePoolSize、maximumPoolSize、largestPoolSize 三個(gè)變量。

corePoolSize 在很多地方被翻譯成核心池大小,其實(shí)我的理解這個(gè)就是線程池的大小。舉個(gè)簡單的例子:

假如有一個(gè)工廠,工廠里面有 10 個(gè)工人,每個(gè)工人同時(shí)只能做一件任務(wù)。

因此只要當(dāng) 10 個(gè)工人中有工人是空閑的,來了任務(wù)就分配給空閑的工人做;

當(dāng) 10 個(gè)工人都有任務(wù)在做時(shí),如果還來了任務(wù),就把任務(wù)進(jìn)行排隊(duì)等待;

如果說新任務(wù)數(shù)目增長的速度遠(yuǎn)遠(yuǎn)大于工人做任務(wù)的速度,那么此時(shí)工廠主管可能會(huì)想補(bǔ)救措施,比如重新招4個(gè)臨時(shí)工人進(jìn)來;

然后就將任務(wù)也分配給這 4 個(gè)臨時(shí)工人做;

如果說著 14 個(gè)工人做任務(wù)的速度還是不夠,此時(shí)工廠主管可能就要考慮不再接收新的任務(wù)或者拋棄前面的一些任務(wù)了。

當(dāng)這 14 個(gè)工人當(dāng)中有人空閑時(shí),而新任務(wù)增長的速度又比較緩慢,工廠主管可能就考慮辭掉 4 個(gè)臨時(shí)工了,只保持原來的10個(gè)工人,畢竟請(qǐng)額外的工人是要花錢的。

這個(gè)例子中的 corePoolSize 就是 10,而 maximumPoolSize 就是14(10+4)。

也就是說 corePoolSize 就是線程池大小,maximumPoolSize 在我看來是線程池的一種補(bǔ)救措施,即任務(wù)量突然過大時(shí)的一種補(bǔ)救措施。

不過為了方便理解,在本文后面還是將 corePoolSize 翻譯成核心池大小。

largestPoolSize 只是一個(gè)用來起記錄作用的變量,用來記錄線程池中曾經(jīng)有過的最大線程數(shù)目,跟線程池的容量沒有任何關(guān)系。

下面我們進(jìn)入正題,看一下任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程

在 ThreadPoolExecutor 類中,最核心的任務(wù)提交方法是 execute() 方法,雖然通過 submit 也可以提交任務(wù),但是實(shí)際上 submit 方法里面最終調(diào)用的還是 execute() 方法,所以我們只需要研究 execute() 方法的實(shí)現(xiàn)原理即可:

public?void?execute(Runnable?command)?{if?(command?==?null)throw?new?NullPointerException();if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))?{if?(runState?==?RUNNING?&&?workQueue.offer(command))?{if?(runState?!=?RUNNING?||?poolSize?==?0)ensureQueuedTaskHandled(command);}else?if?(!addIfUnderMaximumPoolSize(command))reject(command);?//?is?shutdown?or?saturated} }

上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:

首先,判斷提交的任務(wù) command 是否為 null,若是 null,則拋出空指針異常;

接著是這句,這句要好好理解一下:

if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))

由于是或條件運(yùn)算符,所以先計(jì)算前半部分的值,如果線程池中當(dāng)前線程數(shù)不小于核心池大小,那么就會(huì)直接進(jìn)入下面的if語句塊了。

如果線程池中當(dāng)前線程數(shù)小于核心池大小,則接著執(zhí)行后半部分,也就是執(zhí)行

addIfUnderCorePoolSize(command)

如果執(zhí)行完 addIfUnderCorePoolSize 這個(gè)方法返回 false,則繼續(xù)執(zhí)行下面的 if 語句塊,否則整個(gè)方法就直接執(zhí)行完畢了。

如果執(zhí)行完 addIfUnderCorePoolSize 這個(gè)方法返回 false,然后接著判斷:

if?(runState?==?RUNNING?&&?workQueue.offer(command))

如果當(dāng)前線程池處于 RUNNING 狀態(tài),則將任務(wù)放入任務(wù)緩存隊(duì)列;如果當(dāng)前線程池不處于 RUNNING 狀態(tài)或者任務(wù)放入緩存隊(duì)列失敗,則執(zhí)行:

addIfUnderMaximumPoolSize(command)

如果執(zhí)行 addIfUnderMaximumPoolSize 方法失敗,則執(zhí)行 reject() 方法進(jìn)行任務(wù)拒絕處理。

回到前面:

if?(runState?==?RUNNING?&&?workQueue.offer(command))

這句的執(zhí)行,如果說當(dāng)前線程池處于RUNNING狀態(tài)且將任務(wù)放入任務(wù)緩存隊(duì)列成功,則繼續(xù)進(jìn)行判斷:

if?(runState?!=?RUNNING?||?poolSize?==?0)

這句判斷是為了防止在將此任務(wù)添加進(jìn)任務(wù)緩存隊(duì)列的同時(shí)其他線程突然調(diào)用 shutdown 或者 shutdownNow 方法關(guān)閉了線程池的一種應(yīng)急措施。如果是這樣就執(zhí)行:

ensureQueuedTaskHandled(command)

進(jìn)行應(yīng)急處理,從名字可以看出是保證添加到任務(wù)緩存隊(duì)列中的任務(wù)得到處理。

我們接著看 2 個(gè)關(guān)鍵方法的實(shí)現(xiàn):addIfUnderCorePoolSize 和 addIfUnderMaximumPoolSize:

private?boolean?addIfUnderCorePoolSize(Runnable?firstTask)?{Thread?t?=?null;final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{if?(poolSize?<?corePoolSize?&&?runState?==?RUNNING)t?=?addThread(firstTask);????????//創(chuàng)建線程去執(zhí)行firstTask任務(wù)???}?finally?{mainLock.unlock();}if?(t?==?null)return?false;t.start();return?true; }

這個(gè)是?addIfUnderCorePoolSize?方法的具體實(shí)現(xiàn),從名字可以看出它的意圖就是當(dāng)?shù)陀诤诵某源笮r(shí)執(zhí)行的方法。

下面看其具體實(shí)現(xiàn),首先獲取到鎖,因?yàn)檫@地方涉及到線程池狀態(tài)的變化,先通過 if 語句判斷當(dāng)前線程池中的線程數(shù)目是否小于核心池大小,有朋友也許會(huì)有疑問:前面在 execute() 方法中不是已經(jīng)判斷過了嗎,只有線程池當(dāng)前線程數(shù)目小于核心池大小才會(huì)執(zhí)行 addIfUnderCorePoolSize 方法的,為何這地方還要繼續(xù)判斷?原因很簡單,前面的判斷過程中并沒有加鎖,因此可能在execute方法判斷的時(shí)候 poolSize 小于 corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務(wù),就可能導(dǎo)致 poolSize 不小于 corePoolSize 了,所以需要在這個(gè)地方繼續(xù)判斷。然后接著判斷線程池的狀態(tài)是否為 RUNNING,原因也很簡單,因?yàn)橛锌赡茉谄渌€程中調(diào)用了shutdown 或者 shutdownNow 方法。然后就是執(zhí)行

t?=?addThread(firstTask);

這個(gè)方法也非常關(guān)鍵,傳進(jìn)去的參數(shù)為提交的任務(wù),返回值為Thread類型。然后接著在下面判斷 t 是否為空,為空則表明創(chuàng)建線程失敗(即 poolSize >= corePoolSize 或者 runState 不等于 RUNNING),否則調(diào)用 t.start() 方法啟動(dòng)線程。

我們來看一下addThread方法的實(shí)現(xiàn):

private?Thread?addThread(Runnable?firstTask)?{Worker?w?=?new?Worker(firstTask);Thread?t?=?threadFactory.newThread(w);??//創(chuàng)建一個(gè)線程,執(zhí)行任務(wù)???if?(t?!=?null)?{w.thread?=?t;????????????//將創(chuàng)建的線程的引用賦值為w的成員變量???????workers.add(w);int?nt?=?++poolSize;?????//當(dāng)前線程數(shù)加1???????if?(nt?>?largestPoolSize)largestPoolSize?=?nt;}return?t; }

在 addThread 方法中,首先用提交的任務(wù)創(chuàng)建了一個(gè) Worker 對(duì)象,然后調(diào)用線程工廠 threadFactory 創(chuàng)建了一個(gè)新的線程 t,然后將線程t的引用賦值給了 Worker 對(duì)象的成員變量 thread,接著通過 workers.add(w) 將 Worker對(duì)象添加到工作集當(dāng)中。

下面我們看一下 Worker 類的實(shí)現(xiàn):

private?final?class?Worker?implements?Runnable?{private?final?ReentrantLock?runLock?=?new?ReentrantLock();private?Runnable?firstTask;volatile?long?completedTasks;Thread?thread;Worker(Runnable?firstTask)?{this.firstTask?=?firstTask;}boolean?isActive()?{return?runLock.isLocked();}void?interruptIfIdle()?{final?ReentrantLock?runLock?=?this.runLock;if?(runLock.tryLock())?{try?{if?(thread?!=?Thread.currentThread())thread.interrupt();}?finally?{runLock.unlock();}}}void?interruptNow()?{thread.interrupt();}private?void?runTask(Runnable?task)?{final?ReentrantLock?runLock?=?this.runLock;runLock.lock();try?{if?(runState?<?STOP?&&Thread.interrupted()?&&runState?>=?STOP)boolean?ran?=?false;beforeExecute(thread,?task);???//beforeExecute方法是ThreadPoolExecutor類的一個(gè)方法,沒有具體實(shí)現(xiàn),用戶可以根據(jù)//自己需要重載這個(gè)方法和后面的afterExecute方法來進(jìn)行一些統(tǒng)計(jì)信息,比如某個(gè)任務(wù)的執(zhí)行時(shí)間等???????????try?{task.run();ran?=?true;afterExecute(task,?null);++completedTasks;}?catch?(RuntimeException?ex)?{if?(!ran)afterExecute(task,?ex);throw?ex;}}?finally?{runLock.unlock();}}public?void?run()?{try?{Runnable?task?=?firstTask;firstTask?=?null;while?(task?!=?null?||?(task?=?getTask())?!=?null)?{runTask(task);task?=?null;}}?finally?{workerDone(this);???//當(dāng)任務(wù)隊(duì)列中沒有任務(wù)時(shí),進(jìn)行清理工作???????}} }

它實(shí)際上實(shí)現(xiàn)了 Runnable 接口,因此上面的 Thread t = threadFactory.newThread(w) 效果跟下面這句的效果基本一樣:

Thread?t?=?new?Thread(w);

相當(dāng)于傳進(jìn)去了一個(gè)Runnable 任務(wù),在線程t中執(zhí)行這個(gè) Runnable。

既然 Worker 實(shí)現(xiàn)了 Runnable 接口,那么自然最核心的方法便是 run() 方法了:

public?void?run()?{try?{Runnable?task?=?firstTask;firstTask?=?null;while?(task?!=?null?||?(task?=?getTask())?!=?null)?{runTask(task);task?=?null;}}?finally?{workerDone(this);} }

從 run 方法的實(shí)現(xiàn)可以看出,它首先執(zhí)行的是通過構(gòu)造器傳進(jìn)來的任務(wù) firstTask,在調(diào)用 runTask() 執(zhí)行完firstTask 之后,在 while 循環(huán)里面不斷通過 getTask() 去取新的任務(wù)來執(zhí)行,那么去哪里取呢?自然是從任務(wù)緩存隊(duì)列里面去取,getTask 是 ThreadPoolExecutor 類中的方法,并不是 Worker 類中的方法,下面是 getTask 方法的實(shí)現(xiàn):

Runnable?getTask()?{for?(;;)?{try?{int?state?=?runState;if?(state?>?SHUTDOWN)return?null;Runnable?r;if?(state?==?SHUTDOWN)??//?Help?drain?queuer?=?workQueue.poll();else?if?(poolSize?>?corePoolSize?||?allowCoreThreadTimeOut)?//如果線程數(shù)大于核心池大小或者允許為核心池線程設(shè)置空閑時(shí)間,//則通過poll取任務(wù),若等待一定的時(shí)間取不到任務(wù),則返回nullr?=?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);elser?=?workQueue.take();if?(r?!=?null)return?r;if?(workerCanExit())?{????//如果沒取到任務(wù),即r為null,則判斷當(dāng)前的worker是否可以退出if?(runState?>=?SHUTDOWN)?//?Wake?up?othersinterruptIdleWorkers();???//中斷處于空閑狀態(tài)的workerreturn?null;}//?Else?retry}?catch?(InterruptedException?ie)?{//?On?interruption,?re-check?runState}} }

在 getTask 中,先判斷當(dāng)前線程池狀態(tài),如果 runState 大于 SHUTDOWN(即為 STOP 或者 TERMINATED),則直接返回 null。

如果 runState 為 SHUTDOWN 或者 RUNNING,則從任務(wù)緩存隊(duì)列取任務(wù)。

如果當(dāng)前線程池的線程數(shù)大于核心池大小 corePoolSize 或者允許為核心池中的線程設(shè)置空閑存活時(shí)間,則調(diào)用poll(time,timeUnit)來取任務(wù),這個(gè)方法會(huì)等待一定的時(shí)間,如果取不到任務(wù)就返回 null。

然后判斷取到的任務(wù) r 是否為 null,為 null 則通過調(diào)用?workerCanExit()?方法來判斷當(dāng)前 worker 是否可以退出,我們看一下 workerCanExit() 的實(shí)現(xiàn):

private?boolean?workerCanExit()?{final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();boolean?canExit;//如果runState大于等于STOP,或者任務(wù)緩存隊(duì)列為空了//或者??允許為核心池線程設(shè)置空閑存活時(shí)間并且線程池中的線程數(shù)目大于1try?{canExit?=?runState?>=?STOP?||workQueue.isEmpty()?||(allowCoreThreadTimeOut?&&poolSize?>?Math.max(1,?corePoolSize));}?finally?{mainLock.unlock();}return?canExit; }

也就是說如果線程池處于 STOP 狀態(tài)、或者任務(wù)隊(duì)列已為空或者允許為核心池線程設(shè)置空閑存活時(shí)間并且線程數(shù)大于 1 時(shí),允許 worker 退出。如果允許 worker 退出,則調(diào)用interruptIdleWorkers()中斷處于空閑狀態(tài)的 worker,我們看一下 interruptIdleWorkers() 的實(shí)現(xiàn):

void?interruptIdleWorkers()?{final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{for?(Worker?w?:?workers)??//實(shí)際上調(diào)用的是worker的interruptIfIdle()方法w.interruptIfIdle();}?finally?{mainLock.unlock();} }

從實(shí)現(xiàn)可以看出,它實(shí)際上調(diào)用的是 worker 的?interruptIfIdle()方法,在 worker 的 interruptIfIdle() 方法中:

void?interruptIfIdle()?{final?ReentrantLock?runLock?=?this.runLock;if?(runLock.tryLock())?{????//注意這里,是調(diào)用tryLock()來獲取鎖的,因?yàn)槿绻?dāng)前worker正在執(zhí)行任務(wù),鎖已經(jīng)被獲取了,是無法獲取到鎖的//如果成功獲取了鎖,說明當(dāng)前worker處于空閑狀態(tài)try?{if?(thread?!=?Thread.currentThread())??thread.interrupt();}?finally?{runLock.unlock();}} }

這里有一個(gè)非常巧妙的設(shè)計(jì)方式,假如我們來設(shè)計(jì)線程池,可能會(huì)有一個(gè)任務(wù)分派線程,當(dāng)發(fā)現(xiàn)有線程空閑時(shí),就從任務(wù)緩存隊(duì)列中取一個(gè)任務(wù)交給空閑線程執(zhí)行。但是在這里,并沒有采用這樣的方式,因?yàn)檫@樣會(huì)要額外地對(duì)任務(wù)分派線程進(jìn)行管理,無形地會(huì)增加難度和復(fù)雜度,這里直接讓執(zhí)行完任務(wù)的線程去任務(wù)緩存隊(duì)列里面取任務(wù)來執(zhí)行。

我們?cè)倏?addIfUnderMaximumPoolSize 方法的實(shí)現(xiàn),這個(gè)方法的實(shí)現(xiàn)思想和 addIfUnderCorePoolSize 方法的實(shí)現(xiàn)思想非常相似,唯一的區(qū)別在于 addIfUnderMaximumPoolSize 方法是在線程池中的線程數(shù)達(dá)到了核心池大小并且往任務(wù)隊(duì)列中添加任務(wù)失敗的情況下執(zhí)行的:

private?boolean?addIfUnderMaximumPoolSize(Runnable?firstTask)?{Thread?t?=?null;final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{if?(poolSize?<?maximumPoolSize?&&?runState?==?RUNNING)t?=?addThread(firstTask);}?finally?{mainLock.unlock();}if?(t?==?null)return?false;t.start();return?true; }

看到?jīng)]有,其實(shí)它和 addIfUnderCorePoolSize 方法的實(shí)現(xiàn)基本一模一樣,只是 if 語句判斷條件中的 poolSize < maximumPoolSize 不同而已。

到這里,大部分朋友應(yīng)該對(duì)任務(wù)提交給線程池之后到被執(zhí)行的整個(gè)過程有了一個(gè)基本的了解,下面總結(jié)一下

1)首先,要清楚 corePoolSize 和 maximumPoolSize 的含義;

2)其次,要知道 Worker 是用來起到什么作用的;

3)要知道任務(wù)提交給線程池之后的處理策略,這里總結(jié)一下主要有 4 點(diǎn):

  • 如果當(dāng)前線程池中的線程數(shù)目小于 corePoolSize,則每來一個(gè)任務(wù),就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行這個(gè)任務(wù);

  • 如果當(dāng)前線程池中的線程數(shù)目 >= corePoolSize,則每來一個(gè)任務(wù),會(huì)嘗試將其添加到任務(wù)緩存隊(duì)列當(dāng)中,若添加成功,則該任務(wù)會(huì)等待空閑線程將其取出去執(zhí)行;若添加失敗(一般來說是任務(wù)緩存隊(duì)列已滿),則會(huì)嘗試創(chuàng)建新的線程去執(zhí)行這個(gè)任務(wù);

  • 如果當(dāng)前線程池中的線程數(shù)目達(dá)到 maximumPoolSize,則會(huì)采取任務(wù)拒絕策略進(jìn)行處理;

  • 如果線程池中的線程數(shù)量大于 corePoolSize 時(shí),如果某線程空閑時(shí)間超過 keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于 corePoolSize ;如果允許為核心池中的線程設(shè)置存活時(shí)間,那么核心池中的線程空閑時(shí)間超過 keepAliveTime ,線程也會(huì)被終止。

線程池中的線程初始化

默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。

在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個(gè)方法辦到:

  • prestartCoreThread():初始化一個(gè)核心線程;

  • prestartAllCoreThreads():初始化所有核心線程

下面是這2個(gè)方法的實(shí)現(xiàn):

public?boolean?prestartCoreThread()?{return?addIfUnderCorePoolSize(null);?//注意傳進(jìn)去的參數(shù)是null }public?int?prestartAllCoreThreads()?{int?n?=?0;while?(addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null++n;return?n; }

注意上面?zhèn)鬟M(jìn)去的參數(shù)是 null,根據(jù)第 2 小節(jié)的分析可知如果傳進(jìn)去的參數(shù)為 null,則最后執(zhí)行線程會(huì)阻塞在getTask方法中的

r?=?workQueue.take();

即等待任務(wù)隊(duì)列中有任務(wù)。

任務(wù)緩存隊(duì)列及排隊(duì)策略

在前面我們多次提到了任務(wù)緩存隊(duì)列,即 workQueue,它用來存放等待執(zhí)行的任務(wù)。

workQueue 的類型為?BlockingQueue<Runnable>,通常可以取下面三種類型:

1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大小;

2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;

3)synchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)。

任務(wù)拒絕策略

當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到 maximumPoolSize,如果還有任務(wù)到來就會(huì)采取任務(wù)拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

線程池的關(guān)閉

ThreadPoolExecutor 提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是 shutdown() 和 shutdownNow(),其中:

  • shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)

  • shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)

線程池容量的動(dòng)態(tài)調(diào)整

ThreadPoolExecutor 提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize() 和 setMaximumPoolSize(),

  • setCorePoolSize:設(shè)置核心池大小

  • setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小

當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor 進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。

使用示例

前面我們討論了關(guān)于線程池的實(shí)現(xiàn)原理,這一節(jié)我們來看一下它的具體使用:

public?class?Test?{public?static?void?main(String[]?args)?{???ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(5,?10,?200,?TimeUnit.MILLISECONDS,new?ArrayBlockingQueue<Runnable>(5));for(int?i=0;i<15;i++){MyTask?myTask?=?new?MyTask(i);executor.execute(myTask);System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());}executor.shutdown();} }class?MyTask?implements?Runnable?{private?int?taskNum;public?MyTask(int?num)?{this.taskNum?=?num;}@Overridepublic?void?run()?{System.out.println("正在執(zhí)行task?"+taskNum);try?{Thread.currentThread().sleep(4000);}?catch?(InterruptedException?e)?{e.printStackTrace();}System.out.println("task?"+taskNum+"執(zhí)行完畢");} }

執(zhí)行結(jié)果:

正在執(zhí)行task?0 線程池中線程數(shù)目:1,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:2,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?1 線程池中線程數(shù)目:3,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?2 線程池中線程數(shù)目:4,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?3 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?4 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:3,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:4,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:6,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?10 線程池中線程數(shù)目:7,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?11 線程池中線程數(shù)目:8,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?12 線程池中線程數(shù)目:9,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?13 線程池中線程數(shù)目:10,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task?14 task?3執(zhí)行完畢 task?0執(zhí)行完畢 task?2執(zhí)行完畢 task?1執(zhí)行完畢 正在執(zhí)行task?8 正在執(zhí)行task?7 正在執(zhí)行task?6 正在執(zhí)行task?5 task?4執(zhí)行完畢 task?10執(zhí)行完畢 task?11執(zhí)行完畢 task?13執(zhí)行完畢 task?12執(zhí)行完畢 正在執(zhí)行task?9 task?14執(zhí)行完畢 task?8執(zhí)行完畢 task?5執(zhí)行完畢 task?7執(zhí)行完畢 task?6執(zhí)行完畢 task?9執(zhí)行完畢

從執(zhí)行結(jié)果可以看出,當(dāng)線程池中線程的數(shù)目大于 5 時(shí),便將任務(wù)放入任務(wù)緩存隊(duì)列里面,當(dāng)任務(wù)緩存隊(duì)列滿了之后,便創(chuàng)建新的線程。如果上面程序中,將 for 循環(huán)中改成執(zhí)行 20 個(gè)任務(wù),就會(huì)拋出任務(wù)拒絕異常了。

不過在 java doc中,并不提倡我們直接使用 ThreadPoolExecutor,而是使用 Executors 類中提供的幾個(gè)靜態(tài)方法來創(chuàng)建線程池:

Executors.newCachedThreadPool();????????//創(chuàng)建一個(gè)緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor();???//創(chuàng)建容量為1的緩沖池 Executors.newFixedThreadPool(int);????//創(chuàng)建固定容量大小的緩沖池

下面是這三個(gè)靜態(tài)方法的具體實(shí)現(xiàn);

public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{return?new?ThreadPoolExecutor(nThreads,?nThreads,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>()); } public?static?ExecutorService?newSingleThreadExecutor()?{return?new?FinalizableDelegatedExecutorService(new?ThreadPoolExecutor(1,?1,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>())); } public?static?ExecutorService?newCachedThreadPool()?{return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>()); }

從它們的具體實(shí)現(xiàn)來看,它們實(shí)際上也是調(diào)用了 ThreadPoolExecutor,只不過參數(shù)都已配置好了。

newFixedThreadPoo l創(chuàng)建的線程池 corePoolSize 和 maximumPoolSize 值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor 將 corePoolSize 和 maximumPoolSize 都設(shè)置為1,也使用的 LinkedBlockingQueue;

newCachedThreadPool 將 corePoolSize 設(shè)置為0,將 maximumPoolSize 設(shè)置為 Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運(yùn)行,當(dāng)線程空閑超過60秒,就銷毀線程。

實(shí)際中,如果 Executors 提供的三個(gè)靜態(tài)方法能滿足要求,就盡量使用它提供的三個(gè)方法,因?yàn)樽约喝ナ謩?dòng)配置ThreadPoolExecutor 的參數(shù)有點(diǎn)麻煩,要根據(jù)實(shí)際任務(wù)的類型和數(shù)量來進(jìn)行配置。

另外,如果 ThreadPoolExecutor 達(dá)不到要求,可以自己繼承 ThreadPoolExecutor 類進(jìn)行重寫。

如何合理配置線程池的大小

本節(jié)來討論一個(gè)比較重要的話題:如何合理配置線程池大小,僅供參考。

一般需要根據(jù)任務(wù)的類型來配置線程池大小:

如果是 CPU 密集型任務(wù),就需要盡量壓榨 CPU,參考值可以設(shè)為?NCPU+1

如果是 IO 密集型任務(wù),參考值可以設(shè)置為2*NCPU

當(dāng)然,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來進(jìn)行適當(dāng)調(diào)整。

總結(jié)

以上是生活随笔為你收集整理的写的很好!细数 Java 线程池的原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。