HashedWheelTimer时间轮定时任务原理分析
一、示例代碼
?
HashedWheelTimer時間輪是一個高性能,低消耗的數據結構,它適合用非準實時,延遲的短平快任務,例如心跳檢測。
時間輪是一種非常驚艷的數據結構。其在Linux內核中使用廣泛,是Linux內核定時器的實現方法和基礎之一。Netty內部基于時間輪實現了一個HashedWheelTimer來優化I/O超時的檢測,
由于Netty動輒管理100w+的連接,每一個連接都會有很多超時任務。比如發送超時、心跳檢測間隔等,如果每一個定時任務都啟動一個Timer,不僅低效,而且會消耗大量的資源。
在Netty中的一個典型應用場景是判斷某個連接是否idle,如果idle(如客戶端由于網絡原因導致到服務器的心跳無法送達),則服務器會主動斷開連接,釋放資源。得益于Netty NIO的優異性能,基于Netty開發的服務器可以維持大量的長連接,單臺8核16G的云主機可以同時維持幾十萬長連接,及時掐掉不活躍的連接就顯得尤其重要。
HashedWheelTimer本質是一種類似延遲任務隊列的實現,那么它的特點就是上述所說的,適用于對時效性不高的,可快速執行的,大量這樣的“小”任務,能夠做到高性能,低消耗。
例如:
- 心跳檢測
- session、請求是否timeout
業務場景則有: - 用戶下單后發短信
- 下單之后15分鐘,如果用戶不付款就自動取消訂單
二、創建時間輪定時器對象
1.類成員變量分析
tick,時刻即輪上的指針,指向某一個格子
ticksPerWheel,輪盤一圈包含的格子數,也就是輪盤總刻度數
tickDuration,時刻間距,也就是指針走完一個格子的時長,值越小,精度越高。
roundDuration,計時周期,輪盤指針走完一圈耗時,roundDuration=ticksPerWheel?tickDuration。當任務的延期時長delay超出計時周期時,任務放入對應桶中的同時保存剩余圈數:roundsRemaining=delay / roundDuration
bucket,相鄰刻度之間為桶,桶中以鏈表或其他形式存放延時任務。當指針走過該桶時,桶中超時的延時任務開始啟動
?
2.添加任務分析, 這里就是把task加入到Queue<HashedWheelTimeout> timeouts這個待執行的延遲任務隊列中。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();start();long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}?三、定時執行任務。
? ? 1.由workThread中的worker這個對象來負責執行。
2.時間輪指針跳動
private long waitForNextTick() {//計算下一個tick的時間,long deadline = tickDuration * (tick + 1);for (;;) {final long currentTime = System.nanoTime() - startTime;//根據當前計算需要sleep的時間。這里加了999999是因為向上取整了1毫秒,假如距離下一個tick的時間為2000010納秒,那如果sleep 2毫秒是不夠的,所以需要多sleep 1毫秒。long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//sleepTimeMs <=0 說明下一個tick的時間到了,說明上一個tick執行的時間“太久”了,所以直接返回就好了,不需要sleepif (sleepTimeMs <= 0) {//currentTime == Long.MIN_VALUE 這個判斷不是很理解if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}//直接sleep等待try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {//Worker被中斷,如果是關閉了則返回負數,表示不會執行下一個tickif (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}3.轉移任務到時間輪中
在調用時間輪的方法加入任務的時候并沒有直接加入到時間輪中,而是緩存到了timeouts隊列中,所以在運行的時候需要將timeouts隊列中的任務轉移到時間輪數據的鏈表中
private void transferTimeoutsToBuckets() {//最多取隊列的100000的元素出來for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}//如果timeout被取消了則不做處理if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}//計算位于實踐論的層數long calculated = timeout.deadline / tickDuration;timeout.remainingRounds = (calculated - tick) / wheel.length;//就是timeout已經到期了,也不能放到之前的tick中final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.//計算所在bucket下標,并放進去int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];//又是類似鏈表插入節點的操作bucket.addTimeout(timeout);}}在這個轉移方法中,寫死了一個循環,每次都只轉移10萬個任務。
然后根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要運行多少次才能運行當前的任務,如果當前的任務延遲時間大于時間輪跑一圈所需要的時間,那么就計算需要跑幾圈才能到這個任務運行。
最后計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中。
4.運行時間輪中的任務
當指針跳到時間輪的槽位的時間,會將槽位的HashedWheelBucket取出來,然后遍歷鏈表,運行其中到期的任務。
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;//把bucket的所有timeout取出來執行while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {next = remove(timeout);if (timeout.deadline <= deadline) {//timeout的真正執行timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}//該timeout被取消了則移除掉 } else if (timeout.isCancelled()) {next = remove(timeout);//否則層數減一,等待下一輪的到來} else {timeout.remainingRounds --;}timeout = next;}}HashedWheelBucket是一個鏈表,所以我們需要從head節點往下進行遍歷。如果鏈表沒有遍歷到鏈表尾部那么就繼續往下遍歷。
獲取的timeout節點節點,如果剩余輪數remainingRounds大于0,那么就說明要到下一圈才能運行,所以將剩余輪數減一;
如果當前剩余輪數小于等于零了,那么就將當前節點從bucket鏈表中移除,并判斷一下當前的時間是否大于timeout的延遲時間,如果是則調用timeout的expire執行任務。
總結
以上是生活随笔為你收集整理的HashedWheelTimer时间轮定时任务原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ScheduledThreadPoolE
- 下一篇: redission收发命令流程分析