Storm-源码分析-Stats (backtype.storm.stats)
會(huì)發(fā)現(xiàn), 現(xiàn)在storm里面有兩套metrics系統(tǒng), metrics framework和stats framework
并且在所有地方都是同時(shí)注冊(cè)兩套, 貌似準(zhǔn)備用metrics來(lái)替代stats, 但當(dāng)前版本UI仍然使用stats
?
這個(gè)模塊統(tǒng)計(jì)的數(shù)據(jù)怎么被使用,
1. 在worker中, 會(huì)定期調(diào)用do-executor-heartbeats去往zk同步hb
可以看到, stats也會(huì)作為hb的一部分被同步到zk上
2. 現(xiàn)在任何人都可以通過(guò)nimbus的thrift接口來(lái)得到相關(guān)信息
3. 最直接的用戶就是storm UI, 在準(zhǔn)備topology page的時(shí)候, 就會(huì)調(diào)用getTopologyInfo來(lái)獲取數(shù)據(jù)
(defn topology-page [id window include-sys?](with-nimbus nimbus(let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)] )?
Stats
這個(gè)模塊用于spout和bolt來(lái)抽樣統(tǒng)計(jì)數(shù)據(jù), 需要統(tǒng)計(jì)的具體metics如下
(def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) ;;acked and failed count individual tuples (defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies]) ;;acked and failed count tuple completion (defrecord SpoutExecutorStats [common acked failed complete-latencies])?
抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置
為什么統(tǒng)計(jì)時(shí)每次加rate, 而不是加1?
因?yàn)檫@里的統(tǒng)計(jì)是抽樣的, 所以如果抽樣比例是10%, 那么發(fā)現(xiàn)一個(gè), 應(yīng)該加1/(10%), 10個(gè)
(defn sampling-rate [conf](->> (conf TOPOLOGY-STATS-SAMPLE-RATE)(/ 1)int))?
然后統(tǒng)計(jì)是基于時(shí)間窗口的, 底下是對(duì)應(yīng)默認(rèn)的bucket和時(shí)間窗口的定義
(def NUM-STAT-BUCKETS 20) ;;bucket數(shù) ;; 10 minutes, 3 hours, 1 day ;;定義3種時(shí)間窗口 (def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒?
核心數(shù)據(jù)結(jié)構(gòu)是RollingWindowSet, 包含:
統(tǒng)計(jì)數(shù)據(jù)需要的函數(shù), updater extractor, 之所以治理也需要是因?yàn)樾枰y(tǒng)計(jì)all-time?
一組rolling windows, 默認(rèn)是3個(gè)時(shí)間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時(shí)間區(qū)間上的統(tǒng)計(jì)結(jié)果
?
繼續(xù)看看rolling window的定義,
核心數(shù)據(jù), buckets, hashmap, {streamid, data}, 初始化為{}
統(tǒng)計(jì)data需要的函數(shù), updater merger extractor
時(shí)間窗口, buckets大小和buckets個(gè)數(shù)
?
1. mk-stats
在mk-executedata的時(shí)候需要?jiǎng)?chuàng)建stats
mk-executor-stats <> (sampling-rate storm-conf)?
;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate](stats/mk-spout-stats rate)) (defmethod mk-executor-stats :bolt [_ rate](stats/mk-bolt-stats rate))第一個(gè)參數(shù)忽略, 其實(shí)就是分別調(diào)用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對(duì)于每個(gè)需要統(tǒng)計(jì)的數(shù)據(jù), 創(chuàng)建一個(gè)rolling-windows-set
(defn- mk-common-stats [rate](CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))rate))(defn mk-bolt-stats [rate](BoltExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))(defn mk-spout-stats [rate](SpoutExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))?
2. 數(shù)據(jù)更新
(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms](update-executor-stat! stats :acked stream (stats-rate stats))(update-executor-stat! stats :complete-latencies stream latency-ms)) (defmacro update-executor-stat! [stats path & args](let [path (collectify path)]`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))就以u(píng)pdate-executor-stat! stats :acked stream (stats-rate stats)為例子看看怎么做的?
SpoutExecutorStats取出用于記錄spout acked情況的rolling-windows-set
然后使用update-rolling-window-set來(lái)swap這個(gè)atom
來(lái)看看記錄acked的rolling-windows-set是如何定義的?
keyed-counter-rolling-window-set, 預(yù)定義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對(duì)應(yīng)的key的value上
merger, (partial merge-with +), 用+作為map merge的邏輯, 即出現(xiàn)相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無(wú)則返回{}
windows, rolling-window的list
all-time, 初始化為nil
?
好, 下面就看看, 當(dāng)spout-acked-tuple!時(shí)更新:acked時(shí), 如何update的?
首先更新每個(gè)rolling-window, 并把更新過(guò)的rolling-window-set更新到:windows
并且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來(lái)記錄整個(gè)時(shí)間區(qū)間上的, 某個(gè)stream的統(tǒng)計(jì)情況
看下如何更新某個(gè)rolling-windw
根據(jù)now算出當(dāng)前屬于哪個(gè)bucket, time-bucket
取出buckets, 并使用:updater更新相應(yīng)的bucket, 這里的操作仍然是把rate疊加到streamid的value上
轉(zhuǎn)載于:https://www.cnblogs.com/fxjwind/p/3223110.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Storm-源码分析-Stats (backtype.storm.stats)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 注解技术
- 下一篇: 《OpenGL超级宝典第5版》学习笔记(