Netty时间轮调度原理分析,再不了解你就out啦
一、時(shí)間輪介紹
之前公司內(nèi)部搭建的延遲隊(duì)列服務(wù)有用到時(shí)間輪,但是一直沒有了解過它的實(shí)現(xiàn)原理。
最近有個(gè)和支付寶對(duì)接的項(xiàng)目,支付寶接口有流量控制,一定的時(shí)間內(nèi)只允許 N 次接口調(diào)用,針對(duì)一些業(yè)務(wù)我們需要頻繁調(diào)用支付寶開放平臺(tái)接口,如果不對(duì)請(qǐng)求做限制,很容易觸發(fā)流控告警。
為了避免這個(gè)問題,我們按照一定延遲規(guī)則將任務(wù)加載進(jìn)時(shí)間輪內(nèi),通過時(shí)間輪的調(diào)度來實(shí)現(xiàn)接口異步調(diào)用。
很多開源框架都實(shí)現(xiàn)了時(shí)間輪算法,這里以 Netty 為例,看下 Netty 中時(shí)間輪是怎么實(shí)現(xiàn)的。
1.1 快速入門
下面是一個(gè) API 使用例子。
public class WheelTimerSamples {private static final HashedWheelTimerInstance INSTANCE = HashedWheelTimerInstance.INSTANCE;public static void main(String[] args) throws IOException {INSTANCE.getWheelTimer().newTimeout(new PrintTimerTask(), 3, TimeUnit.SECONDS);System.in.read();}static class PrintTimerTask implements TimerTask {@Overridepublic void run(Timeout timeout) {System.out.println("Hello world");}}enum HashedWheelTimerInstance {INSTANCE;private final HashedWheelTimer wheelTimer;HashedWheelTimerInstance() {wheelTimer = new HashedWheelTimer(r -> {Thread t = new Thread(r);t.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName() + e.getMessage()));t.setName("-HashedTimerWheelInstance-");return t;}, 100, TimeUnit.MILLISECONDS, 64);}public HashedWheelTimer getWheelTimer() {return wheelTimer;}} }上面的例子中我們自定義了一個(gè) HashedWheelTimer,然后自定義了一個(gè) TimerTask,將一個(gè)任務(wù)加載進(jìn)時(shí)間輪,3s 后執(zhí)行這個(gè)任務(wù),怎么樣是不是很簡(jiǎn)單。
在定義時(shí)間輪時(shí)建議按照業(yè)務(wù)類型進(jìn)行區(qū)分,將時(shí)間輪定義為多個(gè)單例對(duì)象。
PS:因?yàn)闀r(shí)間輪是異步執(zhí)行的,在任務(wù)執(zhí)行之前 JVM 不能退出,所以 System.in.read(); 這一行代碼不能刪除。
1.2 原理圖解
二、原理分析
2.1 時(shí)間輪狀態(tài)
時(shí)間輪有以下三種狀態(tài):
- WORKER_STATE_INIT:初始化狀態(tài),此時(shí)時(shí)間輪內(nèi)的工作線程還沒有開啟
- WORKER_STATE_STARTED:運(yùn)行狀態(tài),時(shí)間輪內(nèi)的工作線程已經(jīng)開啟
- WORKER_STATE_SHUTDOWN:終止?fàn)顟B(tài),時(shí)間輪停止工作
狀態(tài)轉(zhuǎn)換如下,轉(zhuǎn)換原理會(huì)在下面講到:
2.2 構(gòu)造函數(shù)
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// 初始化時(shí)間輪數(shù)組,時(shí)間輪大小為大于等于 ticksPerWheel 的第一個(gè) 2 的冪,和 HashMap 類似wheel = createWheel(ticksPerWheel);// 取模用,用來定位數(shù)組中的槽mask = wheel.length - 1;// 為了保證精度,時(shí)間輪內(nèi)的時(shí)間單位為納秒long duration = unit.toNanos(tickDuration);// 時(shí)間輪內(nèi)的時(shí)鐘撥動(dòng)頻率不宜太大也不宜太小if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}// 創(chuàng)建工作線程workerThread = threadFactory.newThread(worker);// 非守護(hù)線程且 leakDetection 為 true 時(shí)檢測(cè)內(nèi)存是否泄漏leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;// 初始化最大等待任務(wù)數(shù)this.maxPendingTimeouts = maxPendingTimeouts;// 如果創(chuàng)建的時(shí)間輪實(shí)例大于 64,打印日志,并且這個(gè)日志只會(huì)打印一次if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}構(gòu)造函數(shù)中的參數(shù)相當(dāng)重要,當(dāng)自定義時(shí)間輪時(shí),我們應(yīng)該根據(jù)業(yè)務(wù)的范圍設(shè)置合理的參數(shù):
- threadFactory:創(chuàng)建時(shí)間輪任務(wù)線程的工廠,通過這個(gè)工廠可以給我們的線程自定義一些屬性(線程名、異常處理等)
- tickDuration:時(shí)鐘多長(zhǎng)時(shí)間撥動(dòng)一次,值越小,時(shí)間輪精度越高
- unit:tickDuration 的單位
- ticksPerWheel:時(shí)間輪數(shù)組大小
- leakDetection:是否檢測(cè)內(nèi)存泄漏
- maxPendingTimeouts:時(shí)間輪內(nèi)最大等待的任務(wù)數(shù)
時(shí)間輪的時(shí)鐘撥動(dòng)時(shí)長(zhǎng)應(yīng)該根據(jù)業(yè)務(wù)設(shè)置恰當(dāng)?shù)闹?#xff0c;如果設(shè)置的過大,可能導(dǎo)致任務(wù)觸發(fā)時(shí)間不準(zhǔn)確。如果設(shè)置的過小,時(shí)間輪轉(zhuǎn)動(dòng)頻繁,任務(wù)少的情況下加載不到任務(wù),屬于一直空轉(zhuǎn)的狀態(tài),會(huì)占用 CPU 線程資源。
為了防止時(shí)間輪占用過多的 CPU 資源,當(dāng)創(chuàng)建的時(shí)間輪對(duì)象大于 64 時(shí)會(huì)以日志的方式提示。
構(gòu)造函數(shù)中只是初始化了輪線程,并沒有開啟,當(dāng)?shù)谝淮瓮鶗r(shí)間輪內(nèi)添加任務(wù)時(shí),線程才會(huì)開啟。
2.3 往時(shí)間輪內(nèi)添加任務(wù)
@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}// 等待的任務(wù)數(shù) +1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();// 如果時(shí)間輪內(nèi)等待的任務(wù)數(shù)大于最大值,任務(wù)會(huì)被拋棄if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 開啟時(shí)間輪內(nèi)的線程start();// 計(jì)算當(dāng)前添加任務(wù)的執(zhí)行時(shí)間long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 將任務(wù)加入隊(duì)列HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}任務(wù)會(huì)先保存在隊(duì)列中,當(dāng)時(shí)間輪的時(shí)鐘撥動(dòng)時(shí)才會(huì)判斷是否將隊(duì)列中的任務(wù)加載進(jìn)時(shí)間輪。
public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:// 這里存在并發(fā),通過 CAS 操作保證最終只有一個(gè)線程能開啟時(shí)間輪的工作線程if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}while (startTime == 0) {try {// startTimeInitialized 是一個(gè) CountDownLatch,目的是為了保證工作線程的 startTime 屬性初始化startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}這里通過 CAS 加鎖的方式保證線程安全,避免多次開啟。
工作線程開啟后,start() 方法會(huì)被阻塞,等工作線程的 startTime 屬性初始化完成后才被喚醒。為什么只有等 startTime 初始化后才能繼續(xù)執(zhí)行呢?因?yàn)樯厦娴?newTimeout 方法在線程開啟后,需要計(jì)算當(dāng)前添加進(jìn)來任務(wù)的執(zhí)行時(shí)間,而這個(gè)執(zhí)行時(shí)間是根據(jù) startTime 計(jì)算的。
2.4 時(shí)間輪調(diào)度
@Overridepublic void run() {// 初始化 startTime.startTime = System.nanoTime();if (startTime == 0) {startTime = 1;}// 用來喚醒被阻塞的 HashedWheelTimer#start() 方法,保證 startTime 初始化startTimeInitialized.countDown();do {// 時(shí)鐘撥動(dòng)final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);// 處理過期的任務(wù)processCancelledTasks();HashedWheelBucket bucket =wheel[idx];// 將任務(wù)加載進(jìn)時(shí)間輪transferTimeoutsToBuckets();// 執(zhí)行當(dāng)前時(shí)間輪槽內(nèi)的任務(wù)bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// 時(shí)間輪關(guān)閉,將還未執(zhí)行的任務(wù)以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去// 還未執(zhí)行的任務(wù)可能會(huì)在兩個(gè)地方,一:時(shí)間輪數(shù)組內(nèi),二:隊(duì)列中for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 處理過期的任務(wù)processCancelledTasks();}時(shí)間輪每撥動(dòng)一次 tick 就會(huì) +1,根據(jù)這個(gè)值與(時(shí)間輪數(shù)組長(zhǎng)度 - 1)進(jìn)行 & 運(yùn)算,可以定位時(shí)間輪數(shù)組內(nèi)的槽。因?yàn)?tick 值一直在增加,所以時(shí)間輪數(shù)組看起來就像一個(gè)不斷循環(huán)的圓。
- 先初始化 startTime 值,因?yàn)楹竺嫒蝿?wù)執(zhí)行的時(shí)間是根據(jù) startTime 計(jì)算的
- 時(shí)鐘撥動(dòng),如果時(shí)間未到,則 sleep 一會(huì)兒
- 處理過期的任務(wù)
- 將任務(wù)加載進(jìn)時(shí)間輪
- 執(zhí)行當(dāng)前時(shí)鐘對(duì)應(yīng)時(shí)間輪內(nèi)的任務(wù)
- 時(shí)間輪關(guān)閉,將所有未執(zhí)行的任務(wù)封裝到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
- 處理過期的任務(wù)
上面簡(jiǎn)單羅列了下 run 方法的大概執(zhí)行步驟,下面是具體方法的分析。
2.5 時(shí)鐘撥動(dòng)
如果時(shí)間輪設(shè)置的 tickDuration 為 100ms 撥動(dòng)一次,當(dāng)時(shí)鐘撥動(dòng)一次后,應(yīng)該計(jì)算下一次時(shí)鐘撥動(dòng)的時(shí)間,如果還沒到就 sleep 一會(huì)兒,等到撥動(dòng)時(shí)間再醒來。
private long waitForNextTick() {// 計(jì)算時(shí)鐘下次撥動(dòng)的相對(duì)時(shí)間long deadline = tickDuration * (tick + 1);for (;;) {// 獲取當(dāng)前時(shí)間的相對(duì)時(shí)間final long currentTime = System.nanoTime() - startTime;// 計(jì)算距離時(shí)鐘下次撥動(dòng)的時(shí)間// 這里之所以加 999999 后再除 10000000, 是為了保證足夠的 sleep 時(shí)間// 例如:當(dāng) deadline - currentTime = 2000002 的時(shí)候,如果不加 999999,則只睡了 2ms// 而 2ms 其實(shí)是未到達(dá) deadline 時(shí)間點(diǎn)的,所以為了使上述情況能 sleep 足夠的時(shí)間,加上 999999 后,會(huì)多睡 1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// <=0 說明可以撥動(dòng)時(shí)鐘了if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// 這里是為了兼容 Windows 平臺(tái),因?yàn)?Windows 平臺(tái)的調(diào)度最小單位為 10ms,如果不是 10ms 的倍數(shù),可能會(huì)引起 sleep 時(shí)間不準(zhǔn)確// See https://github.com/Netty/Netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {// sleep 到下次時(shí)鐘撥動(dòng)Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}如果時(shí)間不到就 sleep 等待一會(huì)兒,為了使任務(wù)時(shí)鐘準(zhǔn)確,可以從上面的代碼中看出 Netty 做了一些優(yōu)化,比如 sleepTimeMs 的計(jì)算,Windows 平臺(tái)的處理等。
2.6 將任務(wù)從隊(duì)列加載進(jìn)時(shí)間輪
private void transferTimeoutsToBuckets() {// 一次最多只處理隊(duì)列中的 100000 個(gè)任務(wù)for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}// 過濾已經(jīng)取消的任務(wù)if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}// 計(jì)算當(dāng)前任務(wù)到執(zhí)行還需要經(jīng)過幾次時(shí)鐘撥動(dòng)// 假設(shè)時(shí)間輪數(shù)組大小是 10,calculated 為 12,需要時(shí)間輪轉(zhuǎn)動(dòng)一圈加兩次時(shí)鐘撥動(dòng)后后才能執(zhí)行這個(gè)任務(wù),因此還需要計(jì)算一下圈數(shù)long calculated = timeout.deadline / tickDuration;// 計(jì)算當(dāng)前任務(wù)到執(zhí)行還需要經(jīng)過幾圈時(shí)鐘撥動(dòng)timeout.remainingRounds = (calculated - tick) / wheel.length;// 有的任務(wù)可能在隊(duì)列里很長(zhǎng)時(shí)間,時(shí)間過期了也沒有被調(diào)度,將這種情況的任務(wù)放在當(dāng)前輪次內(nèi)執(zhí)行final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.// 計(jì)算任務(wù)在時(shí)間輪數(shù)組中的槽int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 將任務(wù)放到時(shí)間輪的數(shù)組中,多個(gè)任務(wù)可能定位時(shí)間輪的同一個(gè)槽,這些任務(wù)通過以鏈表的形式鏈接bucket.addTimeout(timeout);}}void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;// 任務(wù)構(gòu)成雙向鏈表timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}在上面也提到過,任務(wù)剛加進(jìn)來不會(huì)立即到時(shí)間輪中去,而是暫時(shí)保存到一個(gè)隊(duì)列中,當(dāng)時(shí)間輪時(shí)鐘撥動(dòng)時(shí),會(huì)將任務(wù)從隊(duì)列中加載進(jìn)時(shí)間輪內(nèi)。
時(shí)間輪每次最大處理 100000 個(gè)任務(wù),因?yàn)槿蝿?wù)的執(zhí)行時(shí)間是用戶自定義的,所以需要計(jì)算任務(wù)到執(zhí)行需要經(jīng)過多少次時(shí)鐘撥動(dòng),并計(jì)算時(shí)間輪撥動(dòng)的圈數(shù)。接著將任務(wù)加載進(jìn)時(shí)間輪對(duì)應(yīng)的槽內(nèi),可能有多個(gè)任務(wù)經(jīng)過 hash 計(jì)算后定位到同一個(gè)槽,這些任務(wù)會(huì)以雙向鏈表的結(jié)構(gòu)保存,有點(diǎn)類似 HashMap 處理碰撞的情況。
2.7 執(zhí)行任務(wù)
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;while (timeout != null) {HashedWheelTimeout next = timeout.next;// 任務(wù)執(zhí)行的圈數(shù) > 0,表示任務(wù)還需要經(jīng)過 remainingRounds 圈時(shí)鐘循環(huán)才能執(zhí)行if (timeout.remainingRounds <= 0) {// 從鏈表中移除當(dāng)前任務(wù),并返回鏈表中下一個(gè)任務(wù)next = remove(timeout);if (timeout.deadline <= deadline) {// 執(zhí)行任務(wù)timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {// 過濾取消的任務(wù)next = remove(timeout);} else {// 圈數(shù) -1timeout.remainingRounds --;}timeout = next;}}public void expire() {// 任務(wù)狀態(tài)校驗(yàn)if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}}時(shí)間輪槽內(nèi)的任務(wù)以鏈表形式存儲(chǔ),這些任務(wù)執(zhí)行的時(shí)間可能會(huì)不一樣,有的在當(dāng)前時(shí)鐘執(zhí)行,有的在下一圈或者下兩圈對(duì)應(yīng)的時(shí)鐘執(zhí)行。當(dāng)任務(wù)在當(dāng)前時(shí)鐘執(zhí)行時(shí),需要將這個(gè)任務(wù)從鏈表中刪除,重新維護(hù)鏈表關(guān)系。
2.8 終止時(shí)間輪
@Overridepublic Set<Timeout> stop() {// 終止時(shí)間輪的線程不能是時(shí)間輪的工作線程if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());}// 將時(shí)間輪的狀態(tài)修改為 WORKER_STATE_SHUTDOWN,這里有兩種情況// 一:時(shí)間輪是 WORKER_STATE_INIT 狀態(tài),表明時(shí)間輪從創(chuàng)建到終止一直沒有任務(wù)進(jìn)來// 二:時(shí)間輪是 WORKER_STATE_STARTED 狀態(tài),多個(gè)線程嘗試終止時(shí)間輪,只有一個(gè)操作成功if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {// 代碼走到這里,時(shí)間輪只能是兩種狀態(tài)中的一個(gè),WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN// 為 WORKER_STATE_INIT 表示時(shí)間輪沒有任務(wù),因此不用返回未處理的任務(wù),但是需要將時(shí)間輪實(shí)例 -1// 為 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失敗,什么都不用做,因?yàn)?CAS 成功的線程會(huì)處理if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {// 時(shí)間輪實(shí)例對(duì)象 -1INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// CAS 操作失敗,或者時(shí)間輪沒有處理過任務(wù),返回空的任務(wù)列表return Collections.emptySet();}try {boolean interrupted = false;while (workerThread.isAlive()) {// 中斷時(shí)間輪工作線程workerThread.interrupt();try {// 終止時(shí)間輪的線程等待時(shí)間輪工作線程 100ms,這個(gè)過程主要是為了時(shí)間輪工作線程處理未執(zhí)行的任務(wù)workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// 返回未處理的任務(wù)return worker.unprocessedTimeouts();}當(dāng)終止時(shí)間輪時(shí),時(shí)間輪狀態(tài)有兩種情況:
- WORKER_STATE_INIT:時(shí)間輪初始化,前面我們說過,當(dāng)初始化時(shí)間輪對(duì)象時(shí)并不會(huì)立即開啟時(shí)間輪工作線程,而是第一次添加任務(wù)時(shí)才開啟,為 WORKER_STATE_INIT 表示時(shí)間輪沒有處理過任務(wù)
- WORKER_STATE_STARTED:時(shí)間輪在工作,這里也有兩種情況,存在并發(fā)與不存在并發(fā),如果多個(gè)線程都嘗試終止時(shí)間輪,肯定只能有一個(gè)成功
時(shí)間輪停止運(yùn)行后會(huì)將未執(zhí)行的任務(wù)返回出去,至于怎么處理這些任務(wù),由業(yè)務(wù)方自己定義,這個(gè)流程和線程池的 shutdownNow 方法是類似的。
如果時(shí)間輪在運(yùn)行,怎么才能獲取到未執(zhí)行的任務(wù)呢,答案就在上面的 run() 方法中,如果時(shí)間輪處于非運(yùn)行狀態(tài),會(huì)把時(shí)間輪數(shù)組與隊(duì)列中未執(zhí)行且未取消的任務(wù)保存到 unprocessedTimeouts 集合中。而終止時(shí)間輪成功的線程只需要等待一會(huì)兒即可,這個(gè)等待是通過 workerThread.join(100); 實(shí)現(xiàn)的。
取消時(shí)間輪內(nèi)的任務(wù)相對(duì)比較簡(jiǎn)單,這里就不概述了,想要了解的自行查看即可。
上面就是時(shí)間輪運(yùn)行的基本原理了。
三、總結(jié)
這里以問答的形式進(jìn)行總結(jié),大家也可以看下這些問題,自己能不能很好的回答出來?
3.1 時(shí)間輪是不是在初始化完成后就啟動(dòng)了?
不是,初始化完成時(shí)間輪的狀態(tài)是 WORKER_STATE_INIT,此時(shí)時(shí)間輪內(nèi)的工作線程還沒有運(yùn)行,只有第一次往時(shí)間輪內(nèi)添加任務(wù)時(shí),才會(huì)開啟時(shí)間輪內(nèi)的工作線程。時(shí)間輪線程開啟后會(huì)初始化 startTime,任務(wù)的執(zhí)行時(shí)間會(huì)根據(jù)這個(gè)字段計(jì)算,而且時(shí)間輪中時(shí)間的概念是相對(duì)的。
3.2 如果時(shí)間輪內(nèi)還有任務(wù)未執(zhí)行,服務(wù)重啟了怎么辦?
時(shí)間輪內(nèi)的任務(wù)都在內(nèi)存中,服務(wù)重啟數(shù)據(jù)肯定都丟了,所以當(dāng)服務(wù)重啟時(shí)需要業(yè)務(wù)方自己做兼容處理。
3.3 如何自定義合適的時(shí)間輪參數(shù)?
自定義時(shí)間輪時(shí)有兩個(gè)比較重要的參數(shù)需要我們注意:
- tickDuration:時(shí)鐘撥動(dòng)頻率,假設(shè)一個(gè)任務(wù)在 10s 后執(zhí)行,tickDuration 設(shè)置為 3min 那肯定是不行的,tickDuration 值越小,任務(wù)觸發(fā)的精度越高,但是沒有任務(wù)時(shí),工作線程會(huì)一直自旋嘗試從隊(duì)列中拿任務(wù),比較消耗 CPU 資源
- ticksPerWheel:時(shí)間輪數(shù)組大小,假設(shè)當(dāng)時(shí)間輪時(shí)鐘撥動(dòng)時(shí),有 10000 個(gè)任務(wù)處理,但是我們定義時(shí)間輪數(shù)組的大小為 8,這時(shí)平均一個(gè)時(shí)間輪槽內(nèi)有 1250 個(gè)任務(wù),如果這 1250 個(gè)任務(wù)都在當(dāng)前時(shí)鐘執(zhí)行,任務(wù)執(zhí)行是同步的,由于每個(gè)任務(wù)執(zhí)行都會(huì)消耗時(shí)間,可能會(huì)導(dǎo)致后面的任務(wù)觸發(fā)時(shí)間不準(zhǔn)確。反之如果數(shù)組長(zhǎng)度設(shè)置的過大,任務(wù)比較少的情況下,時(shí)間輪數(shù)組很多槽都是空的
所以當(dāng)使用自定義時(shí)間輪時(shí),一定要評(píng)估自己的業(yè)務(wù)后再設(shè)置參數(shù)。
3.4 Netty 的時(shí)間輪有什么缺陷?
Netty 中的時(shí)間輪是通過單線程實(shí)現(xiàn)的,如果在執(zhí)行任務(wù)的過程中出現(xiàn)阻塞,會(huì)影響后面任務(wù)執(zhí)行。除此之外,Netty 中的時(shí)間輪并不適合創(chuàng)建延遲時(shí)間跨度很大的任務(wù),比如往時(shí)間輪內(nèi)丟成百上千個(gè)任務(wù)并設(shè)置 10 天后執(zhí)行,這樣可能會(huì)導(dǎo)致鏈表過長(zhǎng) round 值很大,而且這些任務(wù)在執(zhí)行之前會(huì)一直占用內(nèi)存。
3.5 時(shí)間輪要設(shè)置成單例的嗎?
強(qiáng)烈建議按照業(yè)務(wù)模塊區(qū)分,每個(gè)模塊都創(chuàng)建一個(gè)單例的時(shí)間輪對(duì)象。在上面的代碼中我們看到了,當(dāng)時(shí)間輪對(duì)象大于 64 時(shí)會(huì)以日志的形式提示。如果時(shí)間輪是非單例對(duì)象,那時(shí)間輪算法完全就失去了作用。
3.6 時(shí)間輪與 ScheduledExecutorService 的區(qū)別?
ScheduledExecutorService 中的任務(wù)維護(hù)了一個(gè)堆,當(dāng)有大量任務(wù)時(shí),需要調(diào)整堆結(jié)構(gòu)導(dǎo)致性能下降,而時(shí)間輪通過時(shí)鐘調(diào)度,可以不受任務(wù)量的限制。
當(dāng)任務(wù)量比較少時(shí)時(shí)間輪會(huì)一直自旋空轉(zhuǎn)撥動(dòng)時(shí)鐘,相比 ScheduledExecutorService 會(huì)占用一定 CPU 資源。
參考
netty源碼解讀之時(shí)間輪算法實(shí)現(xiàn)-HashedWheelTimer
HashedWheelTimer 使用及源碼分析創(chuàng)建
定時(shí)器的幾種實(shí)現(xiàn)方式
總結(jié)
以上是生活随笔為你收集整理的Netty时间轮调度原理分析,再不了解你就out啦的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 营业外收支净额怎么计算
- 下一篇: 信用卡设置有效期的原因