日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm中并行度原来是这样计算的(1.0.1版本)

發布時間:2025/6/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm中并行度原来是这样计算的(1.0.1版本) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

==思考問題1==

向集群提交一個拓撲的時候,Storm是如何計算Task數以及Executor數的?

具體有多少個worker,多少個executor,每個executor負責多少個task?

?

==思考問題2:==

構建拓撲的時候,有3個地方會影響并行度,這3個地方之間有什么關系?

builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1); builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

?

==3個參數的信息==

1、parallelism-hint:

構建拓撲時,可以通過setSpout或setBolt的函數參數中指定。為初始executor數

如:builder.setSpout("spout", new RandomSentenceSpout(), 5);

?

2、?TOPOLOGY-TASKS:

構建拓撲時,通過Spout/Bolt的setNumTasks()方法來指定。為component的task數(Spout或Bolt)。

如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);

?

3、TOPOLOGY-MAX-TASK-PARALLELISM:

構建拓撲時,通過Spout/Bolt的setMaxTaskParallelism()方法來指定。為component的最大并行度通常用于測試,在本地模式時使用。

如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

?

==結論1:Executor數是多少?==

對應topology代碼中, 為每個component指定的parallelism-hint數(通過setBolt和setSpout的參數)

?

==結論2:Task數是多少?==

版本號:apache-storm-1.0.1

代碼路徑:org/apache/storm/daemon/nimbus.clj

?

?

這里有一個函數非常重要,看了之后上面的3個關系多少會清晰很多。

該函數返回計算之后的真實的Task數

(defn- component-parallelism [storm-conf component](let [storm-conf (merge storm-conf (component-conf component))num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)](if max-parallelism(min max-parallelism num-tasks)num-tasks)))

?

這個代碼是用clojure語言編寫的,沒有用過的人估計會非常蛋疼,

為了方便理解,用偽代碼(方便理解)翻譯之后,大概思路是這個樣子的:

num-tasks = (TOPOLOGY-TASKS != null ? TOPOLOGY-TASKS : parallelism-hint); max-parallelism = TOPOLOGY-MAX-TASK-PARALLELISM;if (max-parallelism != null) {//取兩者較小return min(num-tasks, max-parallelism); } else { return num-tasks; }

?

如果將3個參數進行排列組合之后,獲得結果如下:

?

簡單理解來說:

1、暫時不考慮TOPOLOGY-MAX-TASK-PARALLELIS。(測試用的玩意兒,弄出來影響思路)

2、TOPOLOGY-TASKS優先于parallelism-hint。

?

==Executor與Task是如何匹配的?==

下面的代碼是分配的代碼

(defn- compute-executors [nimbus storm-id](let [conf (:conf nimbus)blob-store (:blob-store nimbus)storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)component->executors (:component->executors storm-base)storm-conf (read-storm-conf-as-nimbus storm-id blob-store)topology (read-storm-topology-as-nimbus storm-id blob-store)task->component (storm-task-info topology storm-conf)](->> (storm-task-info topology storm-conf)reverse-map(map-val sort)(join-maps component->executors)(map-val (partial apply partition-fixed))(mapcat second)(map to-executor-id))))

?

理解這個代碼之前,我們首先把注意力放在storm-task-info這個函數上,看看它都干了些什么。

代碼位置:org/apache/storm/daemon/common.clj

(defn storm-task-info"Returns map from task -> component id"[^StormTopology user-topology storm-conf](->> (system-topology! storm-conf user-topology)all-components(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))(sort-by first) (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) (into {}) ))

?

來看看廣大網友的解讀版。參考博客:https://www.cnblogs.com/ierbar0604/p/4386480.html

這個函數, 首先讀出所有components ,對每個component, 讀出TOPOLOGY-TASKS(已經過標準化之后的TASK數,具體參照前面的內容),

最后用遞增序列產生taskid, 并最終生成component和task的對應關系。

(如果不設置TOPOLOGY-TASKS,task數等于executor數,后面分配就很容易,否則就涉及task分配問題)

?

storm-task-info函數的輸出,是這個樣子的:

{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}

?

然后,我們把注意力返回到compute-executors函數(調用storm-task-info函數的調用處)。

還是用上面博客中,網友解讀的版本來幫助我們理解。(注意:需要對照源碼,確認當前版本代碼是否有變化)

?

==我的筆記==

?

最后,從程序與StormUI界面對比來看看并行度的分配結果。

(拓撲程序)

?

?(UI界面)

?

==簡單總結==

1、有3個地方可以影響Task數,根據3個參數的結果決定Task數。

2、executor數 = 所有組件的parallelism-hint總數。

3、task數在生命周期內不變,executor數可能改變。

?

==rebalance命令==

storm運行過程中,而已使用rebalance命令動態調整拓撲的worker數及并發度。

命令模板:storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*? (*表示可以設置多個)

## 重新配置拓撲 "mytopology",使得該拓撲擁有 5 個 worker processes, ## 另外,配置名為 "blue-spout" 的 spout 使用 3 個 executor, ## 配置名為 "yellow-bolt" 的 bolt 使用 10 個 executor。$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=1

-w:標記覆蓋Storm在禁用與關閉期間等待的時間長度。

?

==其他疑問==

1、網上總是能看到,“不推薦使用setNumTasks”的方式來提高并發度。至于原因確實是一直沒有搞明白。

答:如果只單純的使用setNumTasks,不調整parallelism-hint,會造成多個Task運行在1個executor的結果。并不一定能夠提高性能。

?

2、如果task數比executor數多,是否會有閑置executor?(需要用代碼驗證)

答:不會有閑置executor。

?

-------------

參考博客:

https://www.cnblogs.com/ierbar0604/p/4386480.html

http://lib.csdn.net/article/60/42875

轉載于:https://www.cnblogs.com/quchunhui/p/8271349.html

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Storm中并行度原来是这样计算的(1.0.1版本)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。