日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Storm-源码分析-Stats (backtype.storm.stats)

發(fā)布時(shí)間:2025/5/22 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm-源码分析-Stats (backtype.storm.stats) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

會(huì)發(fā)現(xiàn), 現(xiàn)在storm里面有兩套metrics系統(tǒng), metrics framework和stats framework

并且在所有地方都是同時(shí)注冊(cè)兩套, 貌似準(zhǔn)備用metrics來(lái)替代stats, 但當(dāng)前版本UI仍然使用stats

?

這個(gè)模塊統(tǒng)計(jì)的數(shù)據(jù)怎么被使用,

1. 在worker中, 會(huì)定期調(diào)用do-executor-heartbeats去往zk同步hb
可以看到, stats也會(huì)作為hb的一部分被同步到zk上

(defnk do-executor-heartbeats [worker :executors nil];; stats is how we know what executors are assigned to this worker (let [stats (if-not executors(into {} (map (fn [e] {e nil}) (:executors worker)))(->> executors(map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))(apply merge)))zk-hb {:storm-id (:storm-id worker): executor-stats stats :uptime ((:uptime worker)):time-secs (current-time-secs)}];; do the zookeeper heartbeat(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) ))

2. 現(xiàn)在任何人都可以通過(guò)nimbus的thrift接口來(lái)得到相關(guān)信息

(^TopologyInfo getTopologyInfo [this ^String storm-id]beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))stats (:stats heartbeat))

3. 最直接的用戶就是storm UI, 在準(zhǔn)備topology page的時(shí)候, 就會(huì)調(diào)用getTopologyInfo來(lái)獲取數(shù)據(jù)

(defn topology-page [id window include-sys?](with-nimbus nimbus(let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)] )

?

Stats

這個(gè)模塊用于spout和bolt來(lái)抽樣統(tǒng)計(jì)數(shù)據(jù), 需要統(tǒng)計(jì)的具體metics如下

(def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) ;;acked and failed count individual tuples (defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies]) ;;acked and failed count tuple completion (defrecord SpoutExecutorStats [common acked failed complete-latencies])

?

抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置

為什么統(tǒng)計(jì)時(shí)每次加rate, 而不是加1?

因?yàn)檫@里的統(tǒng)計(jì)是抽樣的, 所以如果抽樣比例是10%, 那么發(fā)現(xiàn)一個(gè), 應(yīng)該加1/(10%), 10個(gè)

(defn sampling-rate [conf](->> (conf TOPOLOGY-STATS-SAMPLE-RATE)(/ 1)int))

?

然后統(tǒng)計(jì)是基于時(shí)間窗口的, 底下是對(duì)應(yīng)默認(rèn)的bucket和時(shí)間窗口的定義

(def NUM-STAT-BUCKETS 20) ;;bucket數(shù) ;; 10 minutes, 3 hours, 1 day ;;定義3種時(shí)間窗口 (def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒

?

核心數(shù)據(jù)結(jié)構(gòu)是RollingWindowSet, 包含:
統(tǒng)計(jì)數(shù)據(jù)需要的函數(shù), updater extractor, 之所以治理也需要是因?yàn)樾枰y(tǒng)計(jì)all-time?
一組rolling windows, 默認(rèn)是3個(gè)時(shí)間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時(shí)間區(qū)間上的統(tǒng)計(jì)結(jié)果

(defrecord RollingWindowSet [updater extractor windows all-time]) (defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes](RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil))

?

繼續(xù)看看rolling window的定義,
核心數(shù)據(jù), buckets, hashmap, {streamid, data}, 初始化為{}
統(tǒng)計(jì)data需要的函數(shù), updater merger extractor
時(shí)間窗口, buckets大小和buckets個(gè)數(shù)

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets]) (defn rolling-window [updater merger extractor bucket-size-secs num-buckets](RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

?

1. mk-stats

在mk-executedata的時(shí)候需要?jiǎng)?chuàng)建stats

mk-executor-stats <> (sampling-rate storm-conf)

?

;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate](stats/mk-spout-stats rate)) (defmethod mk-executor-stats :bolt [_ rate](stats/mk-bolt-stats rate))

第一個(gè)參數(shù)忽略, 其實(shí)就是分別調(diào)用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對(duì)于每個(gè)需要統(tǒng)計(jì)的數(shù)據(jù), 創(chuàng)建一個(gè)rolling-windows-set

(defn- mk-common-stats [rate](CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))rate))(defn mk-bolt-stats [rate](BoltExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))(defn mk-spout-stats [rate](SpoutExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))

?

2. 數(shù)據(jù)更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms](update-executor-stat! stats :acked stream (stats-rate stats))(update-executor-stat! stats :complete-latencies stream latency-ms)) (defmacro update-executor-stat! [stats path & args](let [path (collectify path)]`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))

就以u(píng)pdate-executor-stat! stats :acked stream (stats-rate stats)為例子看看怎么做的?

SpoutExecutorStats取出用于記錄spout acked情況的rolling-windows-set
然后使用update-rolling-window-set來(lái)swap這個(gè)atom

來(lái)看看記錄acked的rolling-windows-set是如何定義的?

keyed-counter-rolling-window-set, 預(yù)定義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對(duì)應(yīng)的key的value上
merger, (partial merge-with +), 用+作為map merge的邏輯, 即出現(xiàn)相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無(wú)則返回{}
windows, rolling-window的list
all-time, 初始化為nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes](apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

?

好, 下面就看看, 當(dāng)spout-acked-tuple!時(shí)更新:acked時(shí), 如何update的?

首先更新每個(gè)rolling-window, 并把更新過(guò)的rolling-window-set更新到:windows
并且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來(lái)記錄整個(gè)時(shí)間區(qū)間上的, 某個(gè)stream的統(tǒng)計(jì)情況

(defn update-rolling-window-set([^RollingWindowSet rws & args](let [now (current-time-secs)new-windows (dofor [w (:windows rws)](apply update-rolling-window w now args))](assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args)))))

看下如何更新某個(gè)rolling-windw
根據(jù)now算出當(dāng)前屬于哪個(gè)bucket, time-bucket
取出buckets, 并使用:updater更新相應(yīng)的bucket, 這里的操作仍然是把rate疊加到streamid的value上

(defn update-rolling-window([^RollingWindow rw time-secs & args];; this is 2.5x faster than using update-in...(let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))buckets (:buckets rw)curr (get buckets time-bucket) curr (apply (:updater rw) curr args)](assoc rw :buckets (assoc buckets time-bucket curr)))))

轉(zhuǎn)載于:https://www.cnblogs.com/fxjwind/p/3223110.html

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的Storm-源码分析-Stats (backtype.storm.stats)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。