日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java的Executor框架和线程池实现原理

發布時間:2025/3/20 java 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java的Executor框架和线程池实现原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一,Java的Executor框架


1,Executor接口

public interface Executor {void execute(Runnable command);} Executor接口是Executor框架中最基礎的部分,定義了一個用于執行Runnable的execute方法,它沒有實現類只有另一個重要的子接口ExecutorService

2,ExecutorService接口

//繼承自Executor接口public interface ExecutorService extends Executor {/*** 關閉方法,調用后執行之前提交的任務,不再接受新的任務*/void shutdown();/*** 從語義上可以看出是立即停止的意思,將暫停所有等待處理的任務并返回這些任務的列表*/List<Runnable> shutdownNow();/*** 判斷執行器是否已經關閉*/boolean isShutdown();/*** 關閉后所有任務是否都已完成*/boolean isTerminated();/*** 中斷*/boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;/*** 提交一個Callable任務*/<T> Future<T> submit(Callable<T> task);/*** 提交一個Runable任務,result要返回的結果*/<T> Future<T> submit(Runnable task, T result);/*** 提交一個任務*/Future<?> submit(Runnable task);/*** 執行所有給定的任務,當所有任務完成,返回保持任務狀態和結果的Future列表*/<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;/*** 執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表。*/<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;/*** 執行給定的任務,如果某個任務已成功完成(也就是未拋出異常),則返回其結果。*/<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;/*** 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。*/<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;} ExecutorService接口繼承自Executor接口,定義了終止、提交,執行任務、跟蹤任務返回結果等方法

1,execute(Runnable command):履行Ruannable類型的任務,

2,submit(task):可用來提交Callable或Runnable任務,并返回代表此任務的Future對象
3,shutdown():在完成已提交的任務后封閉辦事,不再接管新任務,

4,shutdownNow():停止所有正在履行的任務并封閉辦事。
5,isTerminated():測試是否所有任務都履行完畢了。,

6,isShutdown():測試是否該ExecutorService已被關閉

3,Executors的靜態方法:負責生成各種類型的ExecutorService線程池實例

+newFixedThreadPool(numberOfThreads:int):(固定線程池)ExecutorService 創建一個固定線程數量的線程池,并行執行的線程數量不變,線程當前任務完成后,可以被重用執行另一個任務
+newCachedThreadPool():(可緩存線程池)ExecutorService 創建一個線程池,按需創建新線程,就是有任務時才創建,空閑線程保存60s,當前面創建的線程可用時,則重用它們

+new SingleThreadExecutor();(單線程執行器線程池中只有一個線程,依次執行任務


+new ScheduledThreadPool():線程池按時間計劃來執行任務,允許用戶設定執行任務的時間

+new SingleThreadScheduledExcutor();線程池中只有一個線程,它按規定時間來執行任務

4,Runnable、Callable、Future接口

Runnable接口:

// 實現Runnable接口的類將被Thread執行,表示一個基本的任務public interface Runnable {// run方法就是它所有的內容,就是實際執行的任務public abstract void run();} Callable接口:與Runnable接口的區別在于它接收泛型,同時它執行任務后帶有返回內容

// Callable同樣是任務,與Runnable接口的區別在于它接收泛型,同時它執行任務后帶有返回內容public interface Callable<V> {// 相對于run方法的帶有返回值的call方法V call() throws Exception; }

Runnable接口和Callable接口的實現類,都可以被ThreadPoolExecutor和ScheduledThreadPoolExecutor執行,他們之間的區別是Runnable不會返回結果,而Callable可以返回結果。

Executors可以把一個Runnable對象轉換成Callable對象:

public static Callable<Object> callable(Runnbale task);Executors把一個Runnable和一個待返回的結果包裝成一個Callable的API:
public static<T> Callable<T> callable(Runnbale task,T result); 當把一個Callable對象(Callable1,Callable2)提交給ThreadPoolExecutor和 Scheduled ThreadPoolExecutor執行時,submit(...)會向我們返回一個FutureTask對象。我們執行FutureTask.get()來等待任務執行完成,當任務完成后,FutureTask.get()將返回任務的結果。


Future接口:

// Future代表異步任務的執行結果public interface Future<V> {/*** 嘗試取消一個任務,如果這個任務不能被取消(通常是因為已經執行完了),返回false,否則返回true。*/boolean cancel(boolean mayInterruptIfRunning);/*** 返回代表的任務是否在完成之前被取消了*/boolean isCancelled();/*** 如果任務已經完成,返回true*/boolean isDone();/*** 獲取異步任務的執行結果(如果任務沒執行完將等待)*/V get() throws InterruptedException, ExecutionException;/*** 獲取異步任務的執行結果(有最常等待時間的限制)** timeout表示等待的時間,unit是它時間單位*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;} Future就是對于具體的Runnable或者Callable任務的執行結果進行取消查詢是否完成獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果


在Future接口中聲明了5個方法,下面依次解釋每個方法的作用:
+cancel方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。
+isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
+isDone方法表示任務是否已經完成,若任務完成,則返回true;
+get()方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
+get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。

也就是說Future提供了三種功能:
  1)判斷任務是否完成;
  2)能夠中斷任務;
  3)能夠獲取任務執行結果。


FutureTask:

通常使用FutureTask來處理我們的任務。FutureTask類同時又實現了Runnable接口,所以可以直接提交給Executor執行

FutureTask提供了2個構造器:public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { }//事實上,FutureTask是Future接口的一個唯一實現類。

使用FutureTask實現超時執行的代碼如下:

xecutorService executor = Executors.newSingleThreadExecutor(); FutureTask<String> future = new FutureTask<String>(new Callable<String>() {//使用Callable接口作為構造參數 public String call() { //真正的任務在這里執行,這里的返回值類型為String,可以為任意類型 }}); executor.execute(future); //在這里可以做別的任何事情 try { result = future.get(5000, TimeUnit.MILLISECONDS); //取得結果,同時設置超時執行時間為5秒。同樣可以用future.get(),不設置執行超時時間取得結果 } catch (InterruptedException e) { futureTask.cancel(true); } catch (ExecutionException e) { futureTask.cancel(true); } catch (TimeoutException e) { futureTask.cancel(true); } finally { executor.shutdown(); }


不直接構造Future對象,也可以使用ExecutorService.submit方法來獲得Future對象,submit方法即支持以 Callable接口類型,也支持Runnable接口作為參數,具有很大的靈活性。使用示例如下:

ExecutorService executor = Executors.newSingleThreadExecutor(); FutureTask<String> future = executor.submit( new Callable<String>() {//使用Callable接口作為構造參數 public String call() { //真正的任務在這里執行,這里的返回值類型為String,可以為任意類型 }}); //在這里可以做別的任何事情 //同上面取得結果的代碼


線程池實現原理詳解:

ThreadPoolExecutor是線程池的實現類:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) (1)corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程會創建一個線程來執行任務,即使其他空閑的基本線程能創建線程也會創建線程,等到到需要執行的任務數大于線程池基本大小corePoolSize時就不再創建

(2)maximumPoolSize(線程池最大大小):線程池允許最大線程數。如果阻塞隊列滿了,并且已經創建的線程數小于最大線程數,則線程池會再創建新的線程執行。因為線程池執行任務時是線程池基本大小滿了,后續任務進入阻塞隊列,阻塞隊列滿了,在創建線程。

(3)keepAliveTime(線程活動保持時間):空閑線程的保持存活時間。
(4)TimeUnit(線程活動保持時間的單位):

? ? ? ? TimeUnit.DAYS; //天
? ? ? ? TimeUnit.HOURS; //小時
? ? ? ? TimeUnit.MINUTES; //分鐘
? ? ? ? TimeUnit.SECONDS; //秒
? ? ? ? TimeUnit.MILLISECONDS; //毫秒
? ? ? ? TimeUnit.MICROSECONDS; //微妙
? ? ? ? TimeUnit.NANOSECONDS; //納秒

(5)workQueue(任務隊列):用于保存等待執行的任務的阻塞隊列。一個阻塞隊列,用來存儲等待執行的任務:數組,鏈表,不存元素的阻塞隊列

? ? ? 5.1)ArrayBlockingQueue;數組結構的有界阻塞隊列,先進先出FIFO
? ? ? 5.2)LinkedBlockingQueue;鏈表結構的無界阻塞隊列。先進先出FIFO排序元素,靜態方法Executors.newFixedThreadPool使用這個方法

? ? ? 5.3)SynchronousQueue;不存儲元素的阻塞隊列,就是每次插入操作必須等到另一個線程調用移除操作,靜態方法Executors.newCachedThreadPool使用這個方法

?(6)threadFactory:用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字

?(7)handler(飽和策略):表示當拒絕處理任務時的策略。當隊列和線程池都滿了,說明線程池處于飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。

? ? ?ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。

? ? ?ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常

? ? ?ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)

? ? ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

?

我們盡量優先使用Executors提供的靜態方法來創建線程池,如果Executors提供的方法無法滿足要求,再自己通過ThreadPoolExecutor類來創建線程池? ?

Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池 Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUEExecutors.newSingleThreadExecutor(); //創建容量為1的緩沖池

下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }
從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的(n,n),它使用的LinkedBlockingQueue


newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1(1,1),也使用的LinkedBlockingQueue


newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。


1)newFixedThreadPool:(固定線程池)

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 線程池corePoolSize和maximumPoolSize值是相等的(n,n),把keepAliveTime設置0L,意味著多余的空閑線程會被立即終止。

newFixedThreadPool的execute方法執行過程:

1,如果當前運行線程數少于corePoolSize,則創建新線程來執行任務(優先滿足核心池

2,當前運行線程數等于corePoolSize時,將任務加入LinkedBlockingQueue鏈式阻塞隊列(核心池滿了在進入隊列

3,當線程池的任務完成之后,循環反復從LinkedBlockingQueue隊列中獲取任務來執行


2)newSingleThreadExecutor:(單線程執行器)

newSingleThreadExecutor是使用單個worker線程的Executors.

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } newSingleThreadExecutor的execute方法執行過程如下:

? 1,當前運行的線程數少于corePoolSize(即當前線程池中午運行的線程),則創建一個新的線程來執行任務

? 2,當線程池中有一個運行的線程時,將任務加入阻塞隊列

? 3,當線程完成任務時,會無限反復從鏈式阻塞隊列中獲取任務來執行


3,)newCachedThreadPool:可緩存線程池

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); } newCachedThreadPool是一個根據需要創建線程的線程池。

newCachedThreadPool的corePoolSize設置0,即核心池是空,maxmumPoolSize設置為Integer.MAX_VALUE,即maxmumPool是無界的。keepAliveTime設置60L,當空閑線程等待新任務最長時間是60s,超過60s就終止

三個線程池的特點:

1、newFixedThreadPool創建一個指定工作線程數量的線程池。每當提交一個任務就創建一個工作線程,如果工作線程數量達到線程池初始的最大數corePoolSize,則將提交的任務存入到池隊列中

2、newCachedThreadPool創建一個可緩存的線程池。這種類型的線程池特點是:
1).工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
2).如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鐘),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創建一個工作線程。

3、newSingleThreadExecutor創建一個單線程化的Executor,即只創建唯一的工作者線程來執行任務,如果這個線程異常結束,會有另一個取代它,保證順序執行(我覺得這點是它的特色)。單工作線程最大的特點是可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的?


線程池的處理流程:

線程池執行示意圖:


1,首先線程池判斷基本線程池是否已滿(< corePoolSize ?)?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。

2,其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。

3,最后線程池判斷整個線程池是否已滿(< maximumPoolSize ?)?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

總結:線程池優先要創建出基本線程池大小(corePoolSize)的線程數量,沒有達到這個數量時,每次提交新任務都會直接創建一個新線程,當達到了基本線程數量后,又有新任務到達,優先放入等待隊列,如果隊列滿了,才去創建新的線程(不能超過線程池的最大數maxmumPoolSize)


向線程池提交任務的兩種方式:

1)通過execute()方法

ExecutorService threadpool= Executors.newFixedThreadPool(10); threadpool.execute(new Runnable(){...});這種方式提交 沒有返回值,也就不能判斷任務是否被線程池執行成功

2)通過submit()方法

Future<?> future = threadpool.submit(new Runnable(){...}); try { Object res = future.get();//獲取任務執行結果 } catch (InterruptedException e) { // 處理中斷異常 e.printStackTrace(); } catch (ExecutionException e) { // 處理無法執行任務異常 e.printStackTrace(); }finally{ // 關閉線程池 executor.shutdown(); } 使用submit 方法來提交任務,它會返回一個Future對象,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務沒有執行完。


線程池的關閉:

? shutdown():不會立即終止線程池,而是再也不會接受新的任務要等所有任務緩存隊列中的任務都執行完后才終止
? shutdownNow():立即終止線程池,再也不會接受新的任務,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,返回尚未執行的任務

線程池本身的狀態

volatile int runState; static final int RUNNING = 0; //運行狀態 static final int SHUTDOWN = 1; //關閉狀態 static final int STOP = 2; //停止 static final int TERMINATED = 3; //終止,終結
1,當創建線程池后,初始時,線程池處于RUNNING狀態
2,如果調用了shutdown()方法,則線程池處于SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢,最后終止;
3,如果調用了shutdownNow()方法,則線程池處于STOP狀態,此時線程池不能接受新的任務,并且會去嘗試終止正在執行的任務,返回沒有執行的任務列表;
4,當線程池處于SHUTDOWN或STOP狀態,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態

參考:http://blog.csdn.net/shakespeare001/article/details/51330745

http://singleant.iteye.com/blog/1423931

http://blog.csdn.net/it_man/article/details/7193727


總結

以上是生活随笔為你收集整理的Java的Executor框架和线程池实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。