日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

原 荐 简单说说Kafka中的时间轮算法

發布時間:2025/3/20 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 原 荐 简单说说Kafka中的时间轮算法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
零、時間輪定義簡單說說時間輪吧,它是一個高效的延時隊列,或者說定時器。實際上現在網上對于時間輪算法的解釋很多,定義也很全,這里引用一下 朱小廝博客 里出現的定義:參考下圖,Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表(TimerTaskList)。TimerTaskList是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(TimerTaskEntry),其中封裝了真正的定時任務TimerTask。

圖片描述(最多50字)

如果你理解了上面的定義,那么就不必往下看了。但如果你第一次看到和我一樣懵比,并且有不少疑問,那么這篇博文將帶你進一步了解時間輪,甚至理解時間輪算法。如果有興趣,可以去看看其他的定時器 你真的了解延時隊列嗎 。博主認為,時間輪定時器最大的優點:

是任務的添加與移除,都是O(1)級的復雜度;
不會占用大量的資源;
只需要有一個線程去推進時間輪就可以工作了。
我們將對時間輪做層層推進的解析:

一、為什么使用環形隊列假設我們現在有一個很大的數組,專門用于存放延時任務。它的精度達到了毫秒級!那么我們的延遲任務實際上需要將定時的那個時間簡單轉換為毫秒即可,然后將定時任務存入其中:比如說當前的時間是2018/10/24 19:43:45,那么就將任務存入Task[1540381425000],value則是定時任務的內容。

private Task[很長] tasks;
public List<Task> getTaskList(long timestamp) {
return task.get(timestamp)
}
// 假裝這里真的能一毫秒一個循環
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后臺執行()
Thread.sleep(1);
}
}
假如這個數組長度達到了億億級,我們確實可以這么干。 那如果將精度縮減到秒級呢?我們也需要一個百億級長度的數組。

先不說內存夠不夠,顯然你的定時器要這么大的內存顯然很浪費。當然如果我們自己寫一個map,并保證它不存在hash沖突問題,那也是完全可行的。(我不確定我的想法是否正確,如果錯誤,請指出)

/ 一個精度為秒級的延時任務管理類 /
private Map<Long, Task> taskMap;
public List<Task> getTaskList(long timestamp) {
return taskMap.get(timestamp - timestamp % 1000)
}
// 新增一個任務
public void addTask(long timestamp, Task task) {
List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
if (taskList == null){
taskList = new ArrayList();
}
taskList.add(task);
}
// 假裝這里真的能一秒一個循環
public void run(){
while (true){
getTaskList(System.currentTimeMillis()).后臺執行()
Thread.sleep(1000);
}
}
其實時間輪就是一個不存在hash沖突的數據結構

拋開其他疑問,我們看看手腕上的手表(如果沒有去找個鐘表,或者想象一個),是不是無論當前是什么時間,總能用我們的表盤去表示它(忽略精度)

圖片描述(最多50字)

就拿秒表來說,它總是落在 0 - 59 秒,每走一圈,又會重新開始。用偽代碼模擬一下我們這個秒表:

private Bucket[60] buckets;// 表示60秒
public void addTask(long timestamp, Task task) {
Bucket bucket = buckets[timestamp / 1000 % 60];
bucket.add(task);
}
public Bucket getBucket(long timestamp) {
return buckets[timestamp / 1000 % 60];
}
// 假裝這里真的能一秒一個循環
public void run(){
while (true){
getBucket(System.currentTimeMillis()).后臺執行()
Thread.sleep(1000);
}
}
這樣,我們的時間總能落在0 - 59任意一個bucket上,就如同我們的秒鐘總是落在0 - 59刻度上一樣,這便是 時間輪的環形隊列 。

二、表示的時間有限但是細心的小伙伴也會發現這么一個問題:如果只能表示60秒內的定時任務應該怎么存儲與取出,那是不是太有局限性了? 如果想要加入一小時后的延遲任務,該怎么辦?其實還是可以看一看鐘表,對于只有三個指針的表(一般的表)來說,最大能表示12個小時,超過了12小時這個范圍,時間就會產生歧義。如果我們加多幾個指針呢?比如說我們有秒針,分針,時針,上下午針,天針,月針,年針...... 那不就能表示很長很長的一段時間了?而且,它并不需要占用很大的內存。比如說秒針我們可以用一個長度為60的數組來表示,分針也同樣可以用一個長度為60的數組來表示,時針可以用一個長度為24的數組來表示。那么表示一天內的所有時間,只需要三個數組即可。動手來做吧,我們將這個數據結構稱作時間輪,tickMs表示一個刻度,比如說上面說的一秒。wheelSize表示一圈有多少個刻度,即上面說的60。interval表示一圈能表示多少時間,即 tickMs * wheelSize = 60秒。overflowWheel表示上一層的時間輪,比如說,對于秒鐘來說,overflowWheel就表示分鐘,以此類推。

public class TimeWheel {
/ 一個時間槽的時間 */
private long tickMs;
/* 時間輪大小 /
private int wheelSize;
/
時間跨度 */
private long interval;
/ 槽 */
private Bucket[] buckets;
/* 時間輪指針 /
private long currentTimestamp;
/
上層時間輪 /
private volatile TimeWheel overflowWheel;
public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
this.currentTimestamp = currentTimestamp;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs wheelSize;
this.buckets = new Bucket[wheelSize];
this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
for (int i = 0; i < wheelSize; i++) {
buckets[i] = new Bucket();
}
}
}
將任務添加到時間輪中十分簡單,對于每個時間輪來說,比如說秒級時間輪,和分級時間輪,都有它自己的過期槽。也就是delayMs < tickMs的時候。

添加延時任務的時候一共就這幾種情況:####一、時間到期

1)比如說有一個任務要在 16:29:07 執行,從秒級時間輪中來看,當我們的當前時間走到16:29:06的時候,則表示這個任務已經過期了。因為它的delayMs = 1000ms,小于了我們的秒級時間輪的tickMs(1000ms)。
比如說有一個任務要在 16:41:25 執行,從分級時間輪中來看,當我們的當前時間走到 16:41的時候( 分級時間輪沒有秒針!它的最小精度是分鐘(一定要理解這一點) ),則表示這個任務已經到期,因為它的delayMs = 25000ms,小于了我們的分級時間輪的tickMs(60000ms)。
二、時間未到期,且delayMs小于interval。

對于秒級時間輪來說,就是延遲時間小于60s,那么肯定能找到一個秒鐘槽扔進去。三、時間未到期,且delayMs大于interval。對于妙級時間輪來說,就是延遲時間大于等于60s,這時候就需要借助上層時間輪的力量了,很簡單的代碼實現,就是拿到上層時間輪,然后類似遞歸一樣,把它扔進去。比如說一個有一個延時為一年后的定時任務,就會在這個遞歸中不斷創建更上層的時間輪,直到找到滿足delayMs小于interval的那個時間輪。這里為了不把代碼寫的那么復雜,我們每一層時間輪的刻度都一樣,也就是秒級時間輪表示60秒,上面則表示60分鐘,再上面則表示60小時,再上層則表示60個60小時,再上層則表示60個60個60小時 = 216000小時。也就是如果將最底層時間輪的tickMs(精度)設置為1000ms。wheelSize設置為60。 那么只需要5層時間輪,可表示的時間跨度已經長達24年(216000小時) 。

/**

  • 添加任務到某個時間輪
    */
    public boolean addTask(TimedTask timedTask) {
    long expireTimestamp = timedTask.getExpireTimestamp();
    long delayMs = expireTimestamp - currentTimestamp;
    if (delayMs < tickMs) {// 到期了
    return false;
    } else {
    // 扔進當前時間輪的某個槽中,只有時間【大于某個槽】,才會放進去
    if (delayMs < interval) {
    int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
    Bucket bucket = buckets[bucketIndex];
    bucket.addTask(timedTask);
    } else {
    // 當maybeInThisBucket大于等于wheelSize時,需要將它扔到上一層的時間輪
    TimeWheel timeWheel = getOverflowWheel();
    timeWheel.addTask(timedTask);
    }
    }
    return true;
    }
    /**
  • 獲取或創建一個上層時間輪
    */
    private TimeWheel getOverflowWheel() {
    if (overflowWheel == null) {
    synchronized (this) {
    if (overflowWheel == null) {
    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
    }
    }
    }
    return overflowWheel;
    }
    當然我們的時間輪還需要一個指針的推進機制,總不能讓時間永遠停留在當前吧?推進的時候,同時類似遞歸,去推進一下上一層的時間輪。

    注意:要強調一點的是,我們這個時間輪更像是電子表,它不存在時間的中間狀態,也就是精度這個概念一定要理解好。比如說,對于秒級時間輪來說,它的精度只能保證到1秒,小于1秒的,都會當成是已到期

    對于分級時間輪來說,它的精度只能保證到1分,小于1分的,都會當成是已到期

/**

  • 嘗試推進一下指針
    */
    public void advanceClock(long timestamp) {
    if (timestamp >= currentTimestamp + tickMs) {
    currentTimestamp = timestamp - (timestamp % tickMs);
    if (overflowWheel != null) {
    this.getOverflowWheel()
    .advanceClock(timestamp);
    }
    }
    }
    三、對于高層時間輪來說,精度越來越不準,會不會有影響?

    上面說到,分級時間輪,精度只有分鐘級,總不能延遲1秒的定時任務和延遲59秒的定時任務同時執行吧?

    有這個疑問的同學很好!實際上很好解決,只需再入時間輪即可。比如說,對于分鐘級時間輪來說,delayMs為1秒和delayMs為59秒的都已經過期,我們將其取出,再扔進底層的時間輪不就可以了?

    1秒的會被扔到秒級時間輪的下一個執行槽中,而59秒的會被扔到秒級時間輪的后59個時間槽中。

    細心的同學會發現,我們的添加任務方法,返回的是一個bool

public boolean addTask(TimedTask timedTask)
再倒回去好好看看,添加到最底層時間輪失敗的(我們只能直接操作最底層的時間輪,不能直接操作上層的時間輪),是不是會直接返回flase? 對于再入失敗的任務,我們直接執行即可。

/**

  • 將任務添加到時間輪
    */
    public void addOrSubmitTask(TimedTask timedTask) {
    if (!timeWheel.addTask(timedTask)) {
    taskExecutor.submit(timedTask.getTask());
    }
    }
    四、如何知道一個任務已經過期?

    記得我們將任務存儲在槽中嘛?比如說秒級時間輪中,有60個槽,那么一共有60個槽。如果時間輪共有兩層,也僅僅只有120個槽。我們只需將槽扔進一個delayedQueue之中即可。

    我們輪詢地從delayedQueue取出已經過期的槽即可。(前面的所有代碼,為了簡單說明,并沒有引入這個DelayQueue的概念,所以不用去上面翻了,并沒有。博主覺得... 已經看到這里了,應該很明白這個DelayQueue的意義了。 )

    其實簡單來說,實際上定時任務單單使用DelayQueue來實現,也是可以的,但是一旦任務的數量多了起來,達到了百萬級,千萬級,針對這個delayQueue的增刪,將非常的慢。

    一、面向槽的delayQueue

    而對于時間輪來說,它只需要往delayQueue里面扔各種槽即可,比如我們的定時任務長短不一,最長的跨度到了24年,這個delayQueue也僅僅只有300個元素。

    二、處理過期的槽

    而這個槽到期后,也就是被我們從delayQueue中poll出來后,我們只需要將槽中的所有任務循環一次,重新加到新的槽中(添加失敗則直接執行)即可。

/**

  • 推進一下時間輪的指針,并且將delayQueue中的任務取出來再重新扔進去
    */
    public void advanceClock(long timeout) {
    try {
    Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
    if (bucket != null) {
    timeWheel.advanceClock(bucket.getExpire());
    bucket.flush(this::addTask);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

轉載于:https://blog.51cto.com/14028890/2309569

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的原 荐 简单说说Kafka中的时间轮算法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。