日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

揪出XXL-JOB中的细节

發布時間:2024/2/28 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 揪出XXL-JOB中的细节 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

廢話少說,直接進入正題。

相信大家對XXL-JOB都很了解,故本文對源碼不進行過多介紹,側重的是看源碼過程中想到的幾個知識點,不一定都對,請大神們批評指正。

XXL-JOB簡介

  • XXL-JOB是一個輕量級分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼并接入多家公司線上產品線,開箱即用。
  • XXL-JOB分為調度中心、執行器、數據中心,調度中心負責任務管理及調度、執行器管理、日志管理等,執行器負責任務執行及執行結果回調。

任務調度 - “類時間輪”的實現

時間輪

時間輪出自Netty中的HashedWheelTimer,是一個環形結構,可以用時鐘來類比,鐘面上有很多bucket,每一個bucket上可以存放多個任務,使用一個List保存該時刻到期的所有任務,同時一個指針隨著時間流逝一格一格轉動,并執行對應bucket上所有到期的任務。任務通過取模決定應該放入哪個bucket。和HashMap的原理類似,newTask對應put,使用List來解決 Hash 沖突。

?

?

?

以上圖為例,假設一個bucket是1秒,則指針轉動一輪表示的時間段為8s,假設當前指針指向 0,此時需要調度一個3s后執行的任務,顯然應該加入到(0+3=3)的方格中,指針再走3s次就可以執行了;如果任務要在10s后執行,應該等指針走完一輪零2格再執行,因此應放入2,同時將round(1)保存到任務中。檢查到期任務時只執行round為0的,bucket上其他任務的round減1。

當然,還有優化的“分層時間輪”的實現,請參考https://cnkirito.moe/timer/。

XXL-JOB中的“時間輪”

  • XXL-JOB中的調度方式從Quartz變成了自研調度的方式,很像時間輪,可以理解為有60個bucket且每個bucket為1秒,但是沒有了round的概念。

  • 具體可以看下圖。

?

?

?

  • XXL-JOB中負責任務調度的有兩個線程,分別為ringThread和scheduleThread,其作用如下。

1、scheduleThread:對任務信息進行讀取,預讀未來5s即將觸發的任務,放入時間輪。 2、ringThread:對當前bucket和前一個bucket中的任務取出并執行。

  • 下面結合源代碼看下,為什么說是“類時間輪”,關鍵代碼附上了注解,請大家留意觀看。
// 環狀結構 private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();// 任務下次啟動時間(單位為秒) % 60 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 任務放進時間輪 private void pushTimeRing(int ringSecond, int jobId){// push async ringList<Integer> ringItemData = ringData.get(ringSecond);if (ringItemData == null) {ringItemData = new ArrayList<Integer>();ringData.put(ringSecond, ringItemData);}ringItemData.add(jobId);} 復制代碼 // 同時取兩個時間刻度的任務 List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免處理耗時太長,跨過刻度,向前校驗一個刻度; for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);} } // 運行 for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null); } 復制代碼

一致性Hash路由中的Hash算法

  • 大家也知道,XXL-JOB在執行任務時,任務具體在哪個執行器上運行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(源碼在ExecutorRouteConsistentHash.java),自然而然想到了一致性Hash算法
  • 一致性Hash算法是為了解決分布式系統中負載均衡的問題時候可以使用Hash算法讓固定的一部分請求落到同一臺服務器上,這樣每臺服務器固定處理一部分請求(并維護這些請求的信息),起到負載均衡的作用。
  • 普通的余數hash(hash(比如用戶id)%服務器機器數)算法伸縮性很差,當新增或者下線服務器機器時候,用戶id與服務器的映射關系會大量失效。一致性hash則利用hash環對其進行了改進。
  • 一致性Hash算法在實踐中,當服務器節點比較少的時候會出現上節所說的一致性hash傾斜的問題,一個解決方法是多加機器,但是加機器是有成本的,那么就加虛擬節點
  • 具體原理請參考https://www.jianshu.com/p/e968c081f563。
  • 下圖為帶有虛擬節點的Hash環,其中ip1-1是ip1的虛擬節點,ip2-1是ip2的虛擬節點,ip3-1是ip3的虛擬節點。

?

?

?

可見,一致性Hash算法的關鍵在于Hash算法,保證虛擬節點Hash結果的均勻性, 而均勻性可以理解為減少Hash沖突,Hash沖突的知識點請參考從HashMap,Redis 字典看【Hash】。。。。

  • XXL-JOB中的一致性Hash的Hash函數如下。
// jobId轉換為md5 // 不直接用hashCode() 是因為擴大hash取值范圍,減少沖突 byte[] digest = md5.digest();// 32位hashCode long hashCode = ((long) (digest[3] & 0xFF) << 24)| ((long) (digest[2] & 0xFF) << 16)| ((long) (digest[1] & 0xFF) << 8)| (digest[0] & 0xFF);long truncateHashCode = hashCode & 0xffffffffL; 復制代碼
  • 看到上圖的Hash函數,讓我想到了HashMap的Hash函數
f(key) = hash(key) & (table.length - 1) // 使用>>> 16的原因,hashCode()的高位和低位都對f(key)有了一定影響力,使得分布更加均勻,散列沖突的幾率就小了。 hash(key) = (h = key.hashCode()) ^ (h >>> 16) 復制代碼
  • 同理,將jobId的md5編碼的高低位都對Hash結果有影響,使得Hash沖突的概率減小。

分片任務的實現 - 維護線程上下文

  • XXL-JOB的分片任務實現了任務的分布式執行,其實是筆者調研的重點,日常開發中很多定時任務都是單機執行,對于后續數據量大的任務最好有一個分布式的解決方案。

  • 分片任務的路由策略,源代碼作者提出了分片廣播的概念,剛開始還有點摸不清頭腦,看了源碼逐漸清晰了起來。

  • 想必看過源碼的也遇到過這么一個小插曲,路由策略咋沒實現?如下圖所示。

public enum ExecutorRouteStrategyEnum {FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),// 說好的實現呢???竟然是nullSHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null); 復制代碼
  • 再繼續追查得到了結論,待我慢慢道來,首先分片任務執行參數傳遞的是什么?看XxlJobTrigger.trigger函數中的一段代碼。
... // 如果是分片路由,走的是這段邏輯 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()&& shardingParam == null) {for (int i = 0; i < group.getRegistryList().size(); i++) {// 最后兩個參數,i是當前機器在執行器集群當中的index,group.getRegistryList().size()為執行器總數processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} ... 復制代碼
  • 參數經過自研RPC傳遞到執行器,在執行器中具體負責任務執行的JobThread.run中,看到了如下代碼。
// 分片廣播的參數比set進了ShardingUtil ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); ... // 將執行參數傳遞給jobHandler執行 handler.execute(triggerParamTmp.getExecutorParams()) 復制代碼
  • 接著看ShardingUtil,才發現了其中的奧秘,請看代碼。
public class ShardingUtil {// 線程上下文private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();// 分片參數對象public static class ShardingVO {private int index; // sharding indexprivate int total; // sharding total// 次數省略 get/set}// 參數對象注入上下文public static void setShardingVo(ShardingVO shardingVo){contextHolder.set(shardingVo);}// 從上下文中取出參數對象public static ShardingVO getShardingVo(){return contextHolder.get();}} 復制代碼
  • 顯而易見,在負責分片任務的ShardingJobHandler里取出了線程上下文中的分片參數,這里也給個代碼把~
@JobHandler(value="shardingJobHandler") @Service public class ShardingJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {// 分片參數ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());// 業務邏輯for (int i = 0; i < shardingVO.getTotal(); i++) {if (i == shardingVO.getIndex()) {XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);} else {XxlJobLogger.log("第 {} 片, 忽略", i);}}return SUCCESS;}} 復制代碼
  • 由此得出,分布式實現是根據分片參數index及total來做的,簡單來講,就是給出了當前執行器的標識,根據這個標識將任務的數據或者邏輯進行區分,即可實現分布式運行。
  • 題外話:至于為什么用外部注入分片參數的方式,不直接execute傳遞?

1、可能是因為只有分片任務才用到這兩個參數 2、IJobHandler只有String類型參數

看完源碼后的思考

  • 1、經過此次看源代碼,XXL-JOB的設計目標確實符合開發迅速、學習簡單、輕量級、易擴展
  • 2、至于自研RPC還沒有具體考量,具體接入應該會考慮公司的RPC框架。
  • 3、作者給出的Quartz調度的不足,筆者得繼續深入了解。
  • 4、框架中很多對宕機、故障、超時等異常狀況的兼容值得學習。
  • 5、Rolling日志以及日志系統實現需要繼續了解。

參考文獻

  • www.xuxueli.com/xxl-job/#/?…
  • cnkirito.moe/timer/
  • www.jianshu.com/p/e968c081f…

總結

以上是生活随笔為你收集整理的揪出XXL-JOB中的细节的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。