日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析

發(fā)布時間:2025/3/21 java 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • 類結構
  • 核心方法&源碼解析
    • schedule(Runnable command, long delay,TimeUnit unit)
    • scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
    • scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
  • 小結


概述

Java Review - 并發(fā)編程_ThreadPoolExecutor原理&源碼剖析 我們復習了Java中線程池ThreadPoolExecutor的原理,ThreadPoolExecutor只是Executors工具類的一部分功能。

下面來介紹另外一部分功能,也就是ScheduledThreadPoolExecutor的實現,這是一個可以在指定一定延遲時間后或者定時進行任務調度執(zhí)行的線程池。


類結構

  • Executors其實是個工具類,它提供了好多靜態(tài)方法,可根據用戶的選擇返回不同的線程池實例。

  • ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現了ScheduledExecutorService接口。

  • 線程池隊列是DelayedWorkQueue,其和DelayedQueue類似,是一個延遲隊列

  • ScheduledFutureTask是具有返回值的任務,繼承自FutureTask。FutureTask的內部有一個變量state用來表示任務的狀態(tài),一開始狀態(tài)為NEW,所有狀態(tài)為

private static final int NEW = 0; // 初始狀態(tài)private static final int COMPLETING = 1; // 執(zhí)行中private static final int NORMAL = 2; // 正常運行結束狀態(tài)private static final int EXCEPTIONAL = 3; // 運行中異常private static final int CANCELLED = 4; // 任務被取消private static final int INTERRUPTING = 5; // 任務正在被中斷private static final int INTERRUPTED = 6; // 任務已經被中斷

可能的任務狀態(tài)轉換路徑為

NEN-> COMPLETING-> NORMAL//初始狀態(tài)->執(zhí)行中ー>正常結東 NEN-> COMPILETING-> EXCEPTIONAL//初始狀態(tài)->執(zhí)行中ー>執(zhí)行異常 NEN-> CANCELLED//初始狀態(tài)一>任務取消 NEN-> INTERRUPTING-> INTERRUPTED//初始狀態(tài)->被中斷中->被中斷
  • ScheduledFutureTask內部還有一個變量period用來表示任務的類型,任務類型如下

    period=0,說明當前任務是一次性的,執(zhí)行完畢后就退出了。

    period為負數,說明當前任務為fixed-delay任務,是固定延遲的定時可重復執(zhí)行任務。

    period為正數,說明當前任務為fixed-rate任務,是固定頻率的定時可重復執(zhí)行任務

  • ScheduledThreadPoolExecutor的一個構造函數如下,由該構造函數可知線程池隊列是DelayedWorkQueue。

/*** Creates a new {@code ScheduledThreadPoolExecutor} with the* given core pool size.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @throws IllegalArgumentException if {@code corePoolSize < 0}*/// 使用改造后的DelayQueuepublic ScheduledThreadPoolExecutor(int corePoolSize) {// 調用父類ThreadPoolExecutor的構造函數super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());} public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}


核心方法&源碼解析

schedule(Runnable command, long delay,TimeUnit unit)

該方法的作用是提交一個延遲執(zhí)行的任務,任務從提交時間算起延遲單位為unit的delay時間后開始執(zhí)行。提交的任務不是周期性任務,任務只會執(zhí)行一次.

/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {// 1 參數校驗 if (command == null || unit == null)throw new NullPointerException();// 2 任務轉換 RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 3 添加任務到延遲隊列 delayedExecute(t);return t;}
  • 代碼(1)進行參數校驗,如果command或者unit為null,則拋出NPE異常。

  • 代碼(2)裝飾任務,把提交的command任務轉換為ScheduledFutureTask。

    ScheduledFutureTask是具體放入延遲隊列里面的東西。由于是延遲任務,所以ScheduledFutureTask實現了long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法。triggerTime方法將延遲時間轉換為絕對時間,也就是把當前時間的納秒數加上延遲的納秒數后的long型值。

    ScheduledFutureTask的構造函數如下。

    /*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {// 調用父類FutureTask的構造函數super(r, result);this.time = ns;this.period = 0; // 0 說明是一次性任務this.sequenceNumber = sequencer.getAndIncrement();}

    在構造函數內部首先調用了父類FutureTask的構造函數,父類FutureTask的構造函數代碼如下。

    public FutureTask(Runnable runnable, V result) {// 通過適配器把runnable轉換為callablethis.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable 設置狀態(tài)為NEW}

    FutureTask中的任務被轉換為Callable類型后,被保存到了變量this.callable里面,并設置FutureTask的任務狀態(tài)為NEW。

    然后在ScheduledFutureTask構造函數內部設置time為上面說的絕對時間。需要注意,這里period的值為0,這說明當前任務為一次性的任務,不是定時反復執(zhí)行任務。其中l(wèi)ong getDelay(TimeUnit unit)方法的代碼如下(該方法用來計算當前任務還有多少時間就過期了)。

    public long getDelay(TimeUnit unit) {// 裝飾后的時間 - 當前時間 = 即將過期剩余時間return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}

    compareTo的作用是加入元素到延遲隊列后,在內部建立或者調整堆時會使用該元素的compareTo方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的元素都是最快要過期的元素。

  • 代碼(3)將任務添加到延遲隊列,delayedExecute的代碼如下。

    /*** Main execution method for delayed or periodic tasks. If pool* is shut down, rejects the task. Otherwise adds task to queue* and starts a thread, if necessary, to run it. (We cannot* prestart the thread to run the task because the task (probably)* shouldn't be run yet.) If the pool is shut down while the task* is being added, cancel and remove it if required by state and* run-after-shutdown parameters.** @param task the task*/private void delayedExecute(RunnableScheduledFuture<?> task) {// 4 如果線程池拐臂了,則執(zhí)行線程執(zhí)行拒絕策略if (isShutdown())reject(task);else {// 5 添加任務到延遲隊列super.getQueue().add(task);// 6 再次校驗if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 7 確保至少一個線程在處理任務ensurePrestart();}}
    • 代碼(4)首先判斷當前線程池是否已經關閉了,如果已經關閉則執(zhí)行線程池的拒絕策略 ,否則執(zhí)行代碼(5)將任務添加到延遲隊列
    • 添加完畢后還要重新檢查線程池是否被關閉了,如果已經關閉則從延遲隊列里面刪除剛才添加的任務,但是此時有可能線程池中的線程已經從任務隊列里面移除了該任務,也就是該任務已經在執(zhí)行了,所以還需要調用任務的cancle方法取消任務。
    • 如果代碼(6)判斷結果為false,則會執(zhí)行代碼(7)確保至少有一個線程在處理任務,即使核心線程數corePoolSize被設置為0
    /*** Same as prestartCoreThread except arranges that at least one* thread is started even if corePoolSize is 0.*/void ensurePrestart() {int wc = workerCountOf(ctl.get());// 增加核心線程數if (wc < corePoolSize)addWorker(null, true);// 如果初始化corePoolSize=0,則也添加一個線程 else if (wc == 0)addWorker(null, false);}

    如上代碼首先獲取線程池中的線程個數,如果線程個數小于核心線程數則新增一個線程,否則如果當前線程數為0則新增一個線程。

上面我們分析了如何向延遲隊列添加任務,下面我們來看線程池里面的線程如何獲取并執(zhí)行任務。

前面說ThreadPoolExecutor時我們說過,具體執(zhí)行任務的線程是Worker線程,Worker線程調用具體任務的run方法來執(zhí)行。由于這里的任務是ScheduledFutureTask,所以我們下面看看ScheduledFutureTask的run方法

/*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {// 8 是否只執(zhí)行一次 boolean periodic = isPeriodic();// 9 取消任務if (!canRunInCurrentRunState(periodic))cancel(false);// 10 只執(zhí)行一次,調用schedule方法else if (!periodic)ScheduledFutureTask.super.run();// 11 定時執(zhí)行 else if (ScheduledFutureTask.super.runAndReset()) {// 11.1 設置time=time+periodsetNextRunTime();// 11.2 重新加入該任務到delay隊列reExecutePeriodic(outerTask);}}
  • 代碼(8)中的isPeriodic的作用是判斷當前任務是一次性任務還是可重復執(zhí)行的任務

    public boolean isPeriodic() {return period != 0;}

    可以看到,其內部是通過period的值來判斷的,由于轉換任務在創(chuàng)建ScheduledFutureTask時傳遞的period的值為0 ,所以這里isPeriodic返回false。

  • 代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState的代碼如下

    boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);}

    傳遞的periodic的值為false,所以isRunningOrShutdown的參數為executeExisti ngDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown默認為true,表示當其他線程調用了shutdown命令關閉了線程池后,當前任務還是要執(zhí)行,否則如果為false,則當前任務要被取消。

  • 由于periodic的值為false,所以執(zhí)行代碼(10)調用父類FutureTask的run方法具體執(zhí)行任務。FutureTask的run方法的代碼如下

    public void run() {// 12 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 13 try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;// 13.1setException(ex);}// 13.2if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
  • 代碼(12)判斷如果任務狀態(tài)不是NEW則直接返回,或者如果當前任務狀態(tài)為NEW但是使用CAS設置當前任務的持有者為當前線程失敗則直接返回

  • 代碼(13)具體調用callable的call方法執(zhí)行任務。這里在調用前又判斷了任務的狀態(tài)是否為NEW,是為了避免在執(zhí)行代碼(12)后其他線程修改了任務的狀態(tài)(比如取消了該任務)。\

  • 如果任務執(zhí)行成功則執(zhí)行代碼(13.2)修改任務狀態(tài),set方法的代碼如下。

    protected void set(V v) {// 如果為NEW,設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// 設置當前任務的狀態(tài)為NORMAL,也就是任務正常結束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
    • 如上代碼首先使用CAS將當前任務的狀態(tài)從NEW轉換到COMPLETING。這里當有多個線程調用時只有一個線程會成功。成功的線程再通過UNSAFE.putOrderedInt設置任務的狀態(tài)為正常結束狀態(tài),這里沒有使用CAS是因為對于同一個任務只可能有一個線程運行到這里。
    • 在這里使用putOrderedInt比使用CAS或者putLongvolatile效率要高,并且這里的場景不要求其他線程馬上對設置的狀態(tài)值可見。

在什么時候多個線程會同時執(zhí)行CAS將當前任務的狀態(tài)從NEW轉換到COMPLETING?其實當同一個command被多次提交到線程池時就會存在這樣的情況,因為同一個任務共享一個狀態(tài)值state。

如果任務執(zhí)行失敗,則執(zhí)行代碼(13.1)。setException的代碼如下,可見與set函數類似。

protected void setException(Throwable t) {// 如果當前任務的狀態(tài)為NEW,則設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;// 設置當前任務的狀態(tài)為EXCEPTIONAL,也就是任務非正常結束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}

到這里代碼(10)的邏輯執(zhí)行完畢,一次性任務也就執(zhí)行完畢了


scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

該方法的作用是,當任務執(zhí)行完畢后,讓其延遲固定時間后再次運行(fixed-delay任務)

  • initialDelay表示提交任務后延遲多少時間開始執(zhí)行任務command
  • delay表示當任務執(zhí)行完畢后延長多少時間后再次運行command任務
  • unit是initialDelay和delay的時間單位

任務會一直重復運行直到任務運行中拋出了異常,被取消了,或者關閉了線程池。

/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}* @throws IllegalArgumentException {@inheritDoc}*/public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {// 14 參數校驗 if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();// 15 任務轉換 ,注意這里的 poeriod = -dealy < 0 【 unit.toNanos(-delay)】ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// 16 添加任務到隊列delayedExecute(t);return t;}
  • 代碼(14)進行參數校驗,校驗失敗則拋出異常
  • 代碼(15)將command任務轉換為ScheduledFutureTask。這里需要注意的是,傳遞給ScheduledFutureTask的period變量的值為-delay,period<0說明該任務為可重復執(zhí)行的任務。
  • 然后代碼(16)添加任務到延遲隊列后返回。

將任務添加到延遲隊列后線程池線程會從隊列里面獲取任務,然后調用ScheduledFutureTask的run方法執(zhí)行。由于這里period<0,所以isPeriodic返回true,所以執(zhí)行代碼(11)。runAndReset的代碼如下。

/*** Executes the computation without setting its result, and then* resets this future to initial state, failing to do so if the* computation encounters an exception or is cancelled. This is* designed for use with tasks that intrinsically execute more* than once.** @return {@code true} if successfully run and reset*/protected boolean runAndReset() {// 17 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;// 18 boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}

該代碼和FutureTask的run方法類似,只是任務正常執(zhí)行完畢后不會設置任務的狀態(tài),這樣做是為了讓任務成為可重復執(zhí)行的任務。

這里多了代碼(19),這段代碼判斷如果當前任務正常執(zhí)行完畢并且任務狀態(tài)為NEW則返回true,否則返回false。 如果返回了true則執(zhí)行代碼(11.1)的setNextRunTime方法設置該任務下一次的執(zhí)行時間。

/*** Sets the next time to run for a periodic task.*/private void setNextRunTime() {long p = period;if (p > 0) // ffixed-rate類型任務time += p;else // fixed-delay 類型任務 time = triggerTime(-p);}

這里p<0說明當前任務為fixed-delay類型任務。然后設置time為當前時間加上-p的時間,也就是延遲-p時間后再次執(zhí)行。

fixed-delay類型的任務的執(zhí)行原理為: 當添加一個任務到延遲隊列后,等待initialDelay時間,任務就會過期,過期的任務就會被從隊列移除,并執(zhí)行。執(zhí)行完畢后,會重新設置任務的延遲時間,然后再把任務放入延遲隊列,循環(huán)往復。需要注意的是,如果一個任務在執(zhí)行中拋出了異常,那么這個任務就結束了,但是不影響其他任務的執(zhí)行。


scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

該方法相對起始時間點以固定頻率調用指定的任務(fixed-rate任務)。當把任務提交到線程池并延遲initialDelay時間(時間單位為unit)后開始執(zhí)行任務command 。然后從initialDelay+period時間點再次執(zhí)行,而后在 initialDelay + 2 * period時間點再次執(zhí)行,循環(huán)往復,直到拋出異常或者調用了任務的cancel方法取消了任務,或者關閉了線程池。

scheduleAtFixedRate的原理與scheduleWithFixedDelay類似,下面我們看下它們之間的不同點。

首先調用scheduleAtFixedRate的代碼如下

/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}* @throws IllegalArgumentException {@inheritDoc}*/public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// 裝飾任務,注意這里的period=period>0 不是負的ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}

在如上代碼中,在將fixed-rate類型的任務command轉換為ScheduledFutureTask時設置period=period,不再是-period。

所以當前任務執(zhí)行完畢后,調用setNextRunTime設置任務下次執(zhí)行的時間時執(zhí)行的是time += p而不再是time = triggerTime(-p)。

總結:相對于fixed-delay任務來說,fixed-rate方式執(zhí)行規(guī)則為,時間為initdelday +n*period時啟動任務,但是如果當前任務還沒有執(zhí)行完,下一次要執(zhí)行任務的時間到了,則不會并發(fā)執(zhí)行,下次要執(zhí)行的任務會延遲執(zhí)行,要等到當前任務執(zhí)行完畢后再執(zhí)行。

小結

ScheduledThreadPoolExecutor的實現原理,其內部使用DelayQueue來存放具體任務。任務分為三種,其中一次性執(zhí)行任務執(zhí)行完畢就結束了,fixed-delay任務保證同一個任務在多次執(zhí)行之間間隔固定時間,fixed-rate任務保證按照固定的頻率執(zhí)行。任務類型使用period的值來區(qū)分。

《新程序員》:云原生和全面數字化實踐50位技術專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。