JDK并发包2
JDK中提供的并發(fā)工具,在并發(fā)包當(dāng)中,有關(guān)線程池相關(guān)的一些工具類,在介紹線程池之前呢,首先看一下為什么要使用線程池,使用線程池的原因是因?yàn)?線程創(chuàng)建和銷毀的代價(jià)呢,是比較高的,我們?cè)谝粋€(gè)多線程的程序當(dāng)中,我們每一次的提交都要去創(chuàng)建一個(gè)線程,然后每一次任務(wù)完成之后呢,都要銷毀線程,對(duì)于我們業(yè)務(wù)來(lái)說(shuō),這個(gè)線程的創(chuàng)建和線程的銷毀,跟業(yè)務(wù)也是沒(méi)有關(guān)系的,只關(guān)心線程所執(zhí)行的任務(wù),因此我們希望把盡可能多的CPU,用到任務(wù)的執(zhí)行上面,而不是用在創(chuàng)建和銷毀上面,所以就有了線程池的一個(gè)東西,線程池的作用就是把線程呢,比如你要執(zhí)行100個(gè)任務(wù),前前后后你要執(zhí)行100個(gè)任務(wù),對(duì)于這100個(gè)任務(wù)來(lái)說(shuō),如果你分10次批量執(zhí)行,如果我每一次都去創(chuàng)建和銷毀線程,前前后后需要?jiǎng)?chuàng)建和銷毀100個(gè)線程,如果說(shuō)我有線程池,我線程池大小可以是10個(gè)線程,線程池里面就放10個(gè)線程在里面工作,這10個(gè)線程它是不退出的,就常駐線程,如果沒(méi)有任務(wù)的時(shí)候呢,他可能出于一個(gè)等待的狀態(tài),但是有任務(wù)開(kāi)始的時(shí)候呢,他就會(huì)執(zhí)行你的這個(gè)任務(wù),在這種情況之下呢,因?yàn)闆](méi)有銷毀,所以可以馬上執(zhí)行下一個(gè)任務(wù),因此所有的資源只在于說(shuō),任務(wù)本身的執(zhí)行,線程的創(chuàng)建從前到后,他只會(huì)創(chuàng)建10個(gè)線程,這樣就可以節(jié)省CPU的時(shí)間,下面我們簡(jiǎn)單來(lái)看看CPU的實(shí)現(xiàn),線程池實(shí)現(xiàn)的核心呢,你需要把所有的活動(dòng)線程保存起來(lái),所以我們會(huì)有一個(gè)列表,列表當(dāng)中會(huì)保留所有的worker,就是我們的工作線程,Worker是繼承線程的,他必定要有一些特定的實(shí)現(xiàn),正常一個(gè)線程完成工作之后,會(huì)退出,我們會(huì)讓他做一些額外的事情,當(dāng)你需要從線程池當(dāng)中,新建一個(gè)線程的時(shí)候呢,當(dāng)你有一個(gè)任務(wù)需要執(zhí)行的時(shí)候呢,我們當(dāng)然就從線程池當(dāng)中,拿一個(gè)可以用的線程,并且去設(shè)置他的target,把這個(gè)runnable設(shè)進(jìn)去,然后進(jìn)行執(zhí)行了,如果有空閑線程,并不需要去新建線程,如果沒(méi)有空閑線程呢,那我們就創(chuàng)建線程,去執(zhí)行,另外來(lái)看看worker,如果當(dāng)前線程沒(méi)有被關(guān)閉呢,就會(huì)運(yùn)行這個(gè)任務(wù),線程池是不能被關(guān)閉的,如果關(guān)閉了線程池就沒(méi)有意義了,我會(huì)把線還給線程池,線程池的概念和思想解析清楚,離實(shí)用還差很遠(yuǎn),然后我們來(lái)看JDK中的線程池,JDK當(dāng)中有一個(gè)執(zhí)行者Executor,可以執(zhí)行一個(gè)Runnable的一個(gè)接口,較為頂層的一個(gè)接口,一個(gè)線程池他的本質(zhì),實(shí)際上就是一個(gè)執(zhí)行者,執(zhí)行Runnable的一個(gè)任務(wù),Runnable任務(wù)在線程池當(dāng)中,被調(diào)度,由具體的類去實(shí)現(xiàn),另外一個(gè)ExecutorService,提供了一些方法,有關(guān)閉,有submit,提交一個(gè)callable的接口,Callable和Runnable的區(qū)別呢,Runnable沒(méi)有返回值,完成就完成了,但是Callbale的區(qū)別在于,那么線程池最重要的一個(gè)實(shí)現(xiàn)呢,就是ThreadPoolExecutor這個(gè)類,它是一個(gè)執(zhí)行者,此外我們還有一個(gè)工廠方法,Executors是什么呢,它是一個(gè)工廠類,里面提供了一些工廠方法,用來(lái)你方便構(gòu)造一個(gè)線程池
線程池主要有這么一些,固定大小的線程池,線程池當(dāng)中,數(shù)量是固定的,不會(huì)因?yàn)槟闳蝿?wù)的工作量的增加而增加,SingleThread就是單一線程的線程池,只有一個(gè)線程,你來(lái)一個(gè)任務(wù)做一個(gè),就是這樣的工作,緩存的線程池,當(dāng)你提交一個(gè)任務(wù)的時(shí)候,一下子好多個(gè)任務(wù),那就可以開(kāi)好多線程去做,但是如果你沒(méi)有任務(wù)的時(shí)候呢,希望每隔5分鐘執(zhí)行一個(gè)任務(wù),后面Schedule有點(diǎn)像定時(shí)任務(wù)的功能,使用Executors的工廠方法去獲得
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}ThreadPoolExecutor線程池的執(zhí)行者,/*** Creates an Executor that uses a single worker thread operating* off an unbounded queue. (Note however that if this single* thread terminates due to a failure during execution prior to* shutdown, a new one will take its place if needed to execute* subsequent tasks.) Tasks are guaranteed to execute* sequentially, and no more than one task will be active at any* given time. Unlike the otherwise equivalent* {@code newFixedThreadPool(1)} the returned executor is* guaranteed not to be reconfigurable to use additional threads.** @return the newly created single-threaded Executor*/
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}他也是new了一個(gè)ThreadPoolExecutor,/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available. These pools will typically improve the performance* of programs that execute many short-lived asynchronous tasks.* Calls to {@code execute} will reuse previously constructed* threads if available. If no existing thread is available, a new* thread will be created and added to the pool. Threads that have* not been used for sixty seconds are terminated and removed from* the cache. Thus, a pool that remains idle for long enough will* not consume any resources. Note that pools with similar* properties but different details (for example, timeout parameters)* may be created using {@link ThreadPoolExecutor} constructors.** @return the newly created thread pool*/
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}也是ThreadPoolExecutor,看起來(lái)在不同的方法當(dāng)中,這些不同類型的線程池,看起來(lái)似乎是不一樣的,但是這三個(gè)實(shí)現(xiàn)的根本非常的接近,他們都是ThreadPoolExecutor,只是傳入了不同的參數(shù),但是這個(gè)參數(shù)比較復(fù)雜,根據(jù)不同的參數(shù)呢,你可以達(dá)到一個(gè)不同的線程池的效果,因此我們有必要去研究一下,這些參數(shù)到底是什么意思,/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters and default thread factory and rejected execution handler.* It may be more convenient to use one of the {@link Executors} factory* methods instead of this general purpose constructor.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue} is null*/
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}corePoolSize核心線程池的大小,線程池核心線程有多少個(gè),maximumPoolSize最大在這個(gè)線程池當(dāng)中最多能夠容納多少個(gè)線程,線程池當(dāng)中線程的數(shù)量,標(biāo)準(zhǔn)的數(shù)量,我會(huì)往上擴(kuò)展,擴(kuò)展不能夠超過(guò)多少,keepAliveTime可以存活多少時(shí)間,當(dāng)你線程的數(shù)量大于coreSize的時(shí)候呢,超過(guò)你指定時(shí)間的線程,將會(huì)終止掉,線程池當(dāng)中呢,TimeUnit是keepAliveTime的單位,有一個(gè)BlockingQueue,一個(gè)阻塞隊(duì)列,放的是Runnable任務(wù),不停的往線程池里面塞,我沒(méi)有足夠多的線程來(lái)執(zhí)行你的時(shí)候,那我就把它保存起來(lái),我不能把這個(gè)任務(wù)丟掉,我不能簡(jiǎn)單的把它丟掉,我要做最大限度的執(zhí)行,所以這個(gè)任務(wù)會(huì)在workQueue中做排隊(duì),用來(lái)保存這些任務(wù),public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}你希望線程池里面有幾個(gè)線程,我這個(gè)最大數(shù)量和核心數(shù)量呢,應(yīng)該是一樣的,我10個(gè)就是10個(gè),因?yàn)槲覜](méi)有任何擴(kuò)展的空間,所以這個(gè)參數(shù)沒(méi)有意義的,所以我設(shè)置為0,如果我10給你吃排滿了,那我就塞到LinkedBlockingQueue當(dāng)中去,LinkedBlockingQueue他的容量是Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.這么多的,它是一個(gè)無(wú)限的容量,如果你不停的把任務(wù)塞進(jìn)來(lái),我沒(méi)有做完,而你不停的把任務(wù)塞進(jìn)來(lái),后果就是我內(nèi)存的數(shù)據(jù)越來(lái)越多,因?yàn)檫@個(gè)隊(duì)列會(huì)不停的放滿,不停的增長(zhǎng),因?yàn)樗抨?duì)執(zhí)行,這個(gè)就是固定大小的特點(diǎn),Single沒(méi)有好說(shuō)的,因?yàn)樗烷_(kāi)一個(gè)線程池,public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}他其實(shí)就是一個(gè)特殊的fixed線程池,看一下Cachepublic static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}他可以根據(jù)任務(wù)的密集度,自動(dòng)的擴(kuò)展其數(shù)量的,默認(rèn)情況下是0個(gè)線程,這個(gè)比較特殊,核心線程數(shù)量是0,意味著一個(gè)什么事情呢,他在空閑的時(shí)候他是一個(gè)線程都沒(méi)有的,當(dāng)有任務(wù)進(jìn)來(lái)的時(shí)候呢,他就會(huì)開(kāi)線程,如果你沒(méi)有足夠多的線程,執(zhí)行你的任務(wù)的時(shí)候呢,對(duì)于這個(gè)線程池來(lái)講,并不會(huì)說(shuō),開(kāi)一個(gè)線程做事情了,它會(huì)把這個(gè)任務(wù)加到這個(gè)隊(duì)列當(dāng)中去,然后在隊(duì)列當(dāng)中一個(gè)個(gè)排列執(zhí)行,這里傳了一個(gè)SynchronousQueue,同步隊(duì)列的一個(gè)對(duì)象,這個(gè)隊(duì)列他比較奇怪,這個(gè)隊(duì)列的容量是0,我不會(huì)說(shuō)他的容量是1,但是1是可以有一個(gè)數(shù)據(jù)塞進(jìn)去,我是說(shuō)他的容量是0,看起來(lái)是一個(gè)很不可思議的容量,當(dāng)你試圖往里面塞數(shù)據(jù)的時(shí)候,他的容量是0,你塞進(jìn)去你的前提是什么呢,當(dāng)我這個(gè)隊(duì)列里加一個(gè)數(shù)據(jù)的時(shí)候,我加成功的前提呢,有一個(gè)正要往這個(gè)隊(duì)列里面拿數(shù)據(jù),正好有一個(gè)往隊(duì)列里面塞數(shù)據(jù),因此它只是起到一個(gè)數(shù)據(jù)交換的作用,數(shù)據(jù)傳輸?shù)淖饔?他不會(huì)真正保存你真正塞進(jìn)去的數(shù)據(jù),在這種情況之下呢,從0開(kāi)始增長(zhǎng)線程的數(shù)量,任何的一臺(tái)計(jì)算機(jī)都不可能跑出MAX_VALUE這么一個(gè)數(shù)量,然后他還指定了說(shuō),超時(shí)時(shí)間,60秒,如果我這個(gè)線程在60秒沒(méi)有使用,他就會(huì)被終止掉,從這個(gè)cache當(dāng)中被移除,這個(gè)線程池會(huì)保持idle的狀態(tài),不會(huì)占用太多的資源,如果你沒(méi)有任務(wù)上來(lái)的話,60秒的時(shí)候會(huì)被終結(jié),這就是線程池的特點(diǎn),不同參數(shù)下的一個(gè)實(shí)現(xiàn),其實(shí)線程池的使用是很簡(jiǎn)單的https://blog.csdn.net/liujiahan629629/article/details/84454908
package com.learn.thread;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class FixThreadPoolDemo {public static class MyTask implements Runnable{public void run() {System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {MyTask myTask = new MyTask();int size =5;//下篇說(shuō)下阿里技術(shù)規(guī)范插件對(duì)這個(gè)的提示問(wèn)題
// ExecutorService executorService = new ThreadPoolExecutor(size,size,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
// ExecutorService executorService2 = new ThreadPoolExecutor(size,size,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),namedThreadFactory);ExecutorService es = Executors.newFixedThreadPool(size);for (int i = 0; i < 10 ; i++) {es.submit(myTask);}}}
package com.learn.thread;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduledThreadPoolDemo {public static void main(String[] args) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);/*** 1,executorService.scheduleAtFixedRate:創(chuàng)建一個(gè)周期性任務(wù),從上個(gè)任務(wù)開(kāi)始,過(guò)period周期執(zhí)行下一個(gè)(如果執(zhí)行時(shí)間>period,則以執(zhí)行時(shí)間為周期)* 2,executorService.scheduleWithFixedDelay:創(chuàng)建一個(gè)周期上午,從上個(gè)任務(wù)結(jié)束,過(guò)period周期執(zhí)行下一個(gè)。*///如果前邊任務(wù)沒(méi)有完成則調(diào)度也不會(huì)啟動(dòng)executorService.scheduleAtFixedRate(new Runnable() {public void run() {try {Thread.sleep(1000);System.out.println("當(dāng)前時(shí)間:" + System.currentTimeMillis()/1000);} catch (InterruptedException e) {e.printStackTrace();}}},0,2, TimeUnit.SECONDS);}
}
線程池其實(shí)提供了一些回調(diào)的API,來(lái)給我們做一些擴(kuò)展的操作,每一個(gè)任務(wù)開(kāi)始執(zhí)行之前我們是可以知道的,你們可以記錄時(shí)間的信息,你把它給打印出來(lái),執(zhí)行完了之后也可以拿到這些信息,每個(gè)提交任務(wù)在什么時(shí)間執(zhí)行完了,線程池終止的時(shí)候也能捕獲一些信息,給我們一些擴(kuò)展,這個(gè)類似于一個(gè)固定大小的線程池
package com.learn.thread;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ExtThreadPoolDemo {public static class MyTask implements Runnable{public void run() {System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {FixThreadPoolDemo.MyTask myTask = new FixThreadPoolDemo.MyTask();ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10)){@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("準(zhǔn)備執(zhí)行線程:" + r.toString() +"===" + t.getName());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("執(zhí)行完成線程:" + r.toString());}@Overrideprotected void terminated() {System.out.println("線程池退出" );}};for (int i = 0; i < 10 ; i++) {es.submit(myTask);}Thread.sleep(3000);es.shutdown();}
}
如果這個(gè)系統(tǒng)負(fù)載不是很大,就很正常的執(zhí)行,你提交任務(wù)我?guī)湍銏?zhí)行就完事了,但是有些時(shí)候我們的任務(wù)非常的繁重,導(dǎo)致我這個(gè)系統(tǒng)根本就執(zhí)行不過(guò)來(lái),比如我10個(gè)大小的線程池,我們不會(huì)準(zhǔn)備一個(gè)無(wú)限大小的緩存隊(duì)列,因?yàn)橐粋€(gè)無(wú)限大小的對(duì)你是沒(méi)有好處的,實(shí)際上有大片的隊(duì)列上來(lái),你把這個(gè)任務(wù)堆在隊(duì)列當(dāng)中,導(dǎo)致這個(gè)內(nèi)存遞增,如果內(nèi)存一旦遞增而且沒(méi)有釋放呢,你可能就會(huì)出現(xiàn)內(nèi)存的異常,這個(gè)其實(shí)是沒(méi)有好處的,實(shí)際情況中我們也是有遇到過(guò),因?yàn)槌绦蜻\(yùn)行的環(huán)境呢,有時(shí)候可能會(huì)出乎我們的意料,因此呢當(dāng)我們發(fā)現(xiàn)我們的系統(tǒng)負(fù)載到一定程度呢,也學(xué)我們應(yīng)該選擇丟棄一些任務(wù),而不是把它放到內(nèi)存當(dāng)中,看著內(nèi)存就這么爆掉,并不是我丟了就不管了,我丟了幾個(gè)任務(wù),這個(gè)時(shí)候我們就要關(guān)注一下拒絕策略,我這個(gè)任務(wù)不執(zhí)行了我要怎么回絕掉,它是有這么一個(gè)構(gòu)造函數(shù)/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}拒絕策略是由RejectedExecutionHandler來(lái)指定,如果這個(gè)任務(wù)不能執(zhí)行了,那我應(yīng)該怎么做,每個(gè)拒絕策略有不同的操作,/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/
public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
}如果不能執(zhí)行就拋異常,拋出一個(gè)拒絕異常,異常信息被打印出來(lái),/*** A handler for rejected tasks that silently discards the* rejected task.*/
public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}這個(gè)策略什么都沒(méi)做,如果說(shuō)我執(zhí)行不了了,我就把你丟掉,他什么都不做,只簡(jiǎn)單的把任務(wù)丟棄掉,/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}你提交給我任務(wù),我是做不了了,但是呢你可以做,你提交給我一個(gè)任務(wù)呢,我讓提交者來(lái)做,直接在調(diào)用里去做這個(gè)任務(wù)了,除非這個(gè)線程池被關(guān)掉,只要當(dāng)前這個(gè)線程池還活著,就讓調(diào)用者來(lái)做這個(gè)事情,/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}我要丟棄這個(gè)任務(wù),但是我丟什么呢,我要丟一個(gè)最老的沒(méi)有處理的請(qǐng)求,從隊(duì)列當(dāng)中拿到workder queue/*** Returns the task queue used by this executor. Access to the* task queue is intended primarily for debugging and monitoring.* This queue may be in active use. Retrieving the task queue* does not prevent queued tasks from executing.** @return the task queue*/
public BlockingQueue<Runnable> getQueue() {return workQueue;
}拿出一個(gè)最老的給他丟掉
自定義的線程池工廠,/*** Returns the thread factory used to create new threads.** @return the current thread factory* @see #setThreadFactory(ThreadFactory)*/
public ThreadFactory getThreadFactory() {return threadFactory;
}/*** The default thread factory*/
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields* workerCount, indicating the effective number of threads* runState, indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));下面我們來(lái)看一下Forkjoin
他也是一個(gè)新的線程池,他基本思想是這樣的,如果你有一個(gè)很大的任務(wù),我們就可以把這個(gè)任務(wù)分解,我會(huì)把它分解成若干小的任務(wù),每個(gè)小任務(wù)都進(jìn)行計(jì)算,計(jì)算完成之后呢,我們把結(jié)果進(jìn)行整合和收集,我把大任務(wù)進(jìn)行小任務(wù)分解,大任務(wù)變成小任務(wù),join是指我等待小任務(wù),執(zhí)行結(jié)束,join也會(huì)做這個(gè)等待,等待你這個(gè)子任務(wù)結(jié)束,結(jié)束之后把結(jié)果返回出來(lái),做進(jìn)一步的整合,從而得到最終的結(jié)果,因而它是一個(gè)分而治之的思想,這里的提交任務(wù)并不是說(shuō),他一定回去新建一個(gè)線程,而是把這個(gè)任務(wù)推向線程池當(dāng)中,workerQueue當(dāng)中去,他主要提供兩個(gè)接口
這里的區(qū)別就在于有沒(méi)有返回值的一個(gè)問(wèn)題,Action是沒(méi)有返回值的,只是說(shuō)我要做這個(gè)動(dòng)作,我要把它分解成多個(gè)小的行為,去執(zhí)行,所有的這些行為是沒(méi)有返回值的,做完就結(jié)束了,Task它是有返回值的,你可以給他指定一個(gè)返回值,他有返回值之后呢,得到最終的一個(gè)結(jié)果
package com.learn.thread;import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;public class ForkJoinThreadPoolDemo extends RecursiveTask<Long> {private static final int THRESHOLD = 10000;private long start;private long end;public ForkJoinThreadPoolDemo(long start, long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {long sum = 0;boolean canCompute = (end - start) < THRESHOLD;if (canCompute) {for (long i = start; i < end; i++) {sum += i;}} else {//分成100份進(jìn)行處理long step = (start + end) / 50;ArrayList<ForkJoinThreadPoolDemo> subTasks = new ArrayList<ForkJoinThreadPoolDemo>();long pos = start;for (int i = 0; i < 50; i++) {long lastOne = pos + step;if (lastOne > end) {lastOne = end;}ForkJoinThreadPoolDemo subTask = new ForkJoinThreadPoolDemo(pos, lastOne);pos += step;subTasks.add(subTask);subTask.fork();}for (ForkJoinThreadPoolDemo t : subTasks) {sum += t.join();}}return sum;}//結(jié)果199990000public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinThreadPoolDemo task = new ForkJoinThreadPoolDemo(0, 20000);ForkJoinTask<Long> result = forkJoinPool.submit(task);try {long res = result.get();System.out.println("結(jié)果" + res);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
/*** Scans for and tries to steal a top-level task. Scans start at a* random location, randomly moving on apparent contention,* otherwise continuing linearly until reaching two consecutive* empty passes over all queues with the same checksum (summing* each base index of each queue, that moves on each steal), at* which point the worker tries to inactivate and then re-scans,* attempting to re-activate (itself or some other worker) if* finding a task; otherwise returning null to await work. Scans* otherwise touch as little memory as possible, to reduce* disruption on other scanning threads.** @param w the worker (via its WorkQueue)* @param r a random seed* @return a task, or null if none found*/
private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {int ss = w.scanState; // initially non-negativefor (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;int b, n; long c;if ((q = ws[k]) != null) {if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) { // non-emptylong i = (((a.length - 1) & b) << ASHIFT) + ABASE;if ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&q.base == b) {if (ss >= 0) {if (U.compareAndSwapObject(a, i, t, null)) {q.base = b + 1;if (n < -1) // signal otherssignalWork(ws, q);return t;}}else if (oldSum == 0 && // try to activatew.scanState < 0)tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);}if (ss < 0) // refreshss = w.scanState;r ^= r << 1; r ^= r >>> 3; r ^= r << 10;origin = k = r & m; // move and rescanoldSum = checkSum = 0;continue;}checkSum += b;}if ((k = (k + 1) & m) == origin) { // continue until stableif ((ss >= 0 || (ss == (ss = w.scanState))) &&oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0) // already inactivebreak;int ns = ss | INACTIVE; // try to inactivatelong nc = ((SP_MASK & ns) |(UC_MASK & ((c = ctl) - AC_UNIT)));w.stackPred = (int)c; // hold prev stack topU.putInt(w, QSCANSTATE, ns);if (U.compareAndSwapLong(this, CTL, c, nc))ss = ns;elsew.scanState = ss; // back out}checkSum = 0;}}}return null;
}/*** Top-level runloop for workers, called by ForkJoinWorkerThread.run.*/
final void runWorker(WorkQueue w) {w.growArray(); // allocate queueint seed = w.hint; // initially holds randomization hintint r = (seed == 0) ? 1 : seed; // avoid 0 for xorShiftfor (ForkJoinTask<?> t;;) {if ((t = scan(w, r)) != null)w.runTask(t);else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}
}
?
總結(jié)