技术实践 | 如何基于 Flink 实现通用的聚合指标计算框架
導(dǎo)讀:網(wǎng)易云信作為一個(gè) PaaS 服務(wù),需要對(duì)線上業(yè)務(wù)進(jìn)行實(shí)時(shí)監(jiān)控,實(shí)時(shí)感知服務(wù)的“心跳”、“脈搏”、“血壓”等健康狀況。通過(guò)采集服務(wù)拿到 SDK、服務(wù)器等端的心跳埋點(diǎn)日志,是一個(gè)非常龐大且雜亂無(wú)序的數(shù)據(jù)集,而如何才能有效利用這些數(shù)據(jù)?服務(wù)監(jiān)控平臺(tái)要做的事情就是對(duì)海量數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,聚合出表征服務(wù)的“心跳”、“脈搏”、“血壓”的核心指標(biāo),并將其直觀的展示給相關(guān)同學(xué)。這其中核心的能力便是 :實(shí)時(shí)分析和實(shí)時(shí)聚合。
文|圣少友 網(wǎng)易云信數(shù)據(jù)平臺(tái)資深開(kāi)發(fā)工程師
在之前的《網(wǎng)易云信服務(wù)監(jiān)控平臺(tái)實(shí)踐》一文中,我們圍繞數(shù)據(jù)采集、數(shù)據(jù)處理、監(jiān)控告警、數(shù)據(jù)應(yīng)用 4 個(gè)環(huán)節(jié),介紹了網(wǎng)易云信服務(wù)監(jiān)控平臺(tái)的整體框架。本文是對(duì)網(wǎng)易云信在聚合指標(biāo)計(jì)算邏輯上的進(jìn)一步詳述。
基于明細(xì)數(shù)據(jù)集進(jìn)行實(shí)時(shí)聚合,生產(chǎn)一個(gè)聚合指標(biāo),業(yè)界常用的實(shí)現(xiàn)方式是 Spark Streaming、Flink SQL / Stream API。不論是何種方式,我們都需要通過(guò)寫(xiě)代碼來(lái)指定數(shù)據(jù)來(lái)源、數(shù)據(jù)清洗邏輯、聚合維度、聚合窗口大小、聚合算子等。如此繁雜的邏輯和代碼,無(wú)論是開(kāi)發(fā)、測(cè)試,還是后續(xù)任務(wù)的維護(hù),都需要投入大量的人力/物力成本。而我們程序員要做的便是化繁為簡(jiǎn)、實(shí)現(xiàn)大巧不工。
本文將闡述網(wǎng)易云信是如何基于 Flink 的 Stream API,實(shí)現(xiàn)一套通用的聚合指標(biāo)計(jì)算框架。
整體架構(gòu)
如上圖所示,是我們基于 Flink 自研的聚合指標(biāo)完整加工鏈路,其中涉及到的模塊包括:
-
source:定期加載聚合規(guī)則,并根據(jù)聚合規(guī)則按需創(chuàng)建 Kafka 的 Consumer,并持續(xù)消費(fèi)數(shù)據(jù)。
-
process:包括分組邏輯、窗口邏輯、聚合邏輯、環(huán)比計(jì)算邏輯等。從圖中可以看到,我們?cè)诰酆想A段分成了兩個(gè),這樣做的目的是什么?其中的好處是什么呢?做過(guò)分布式和并發(fā)計(jì)算的,都會(huì)遇到一個(gè)共同的敵人:數(shù)據(jù)傾斜。在我們 PaaS 服務(wù)中頭部客戶會(huì)更加明顯,所以傾斜非常嚴(yán)重,分成兩個(gè)階段進(jìn)行聚合的奧妙下文中會(huì)詳細(xì)說(shuō)明。
-
sink:是數(shù)據(jù)輸出層,目前默認(rèn)輸出到 Kafka 和 InfluxDB,前者用于驅(qū)動(dòng)后續(xù)計(jì)算(如告警通知等),后者用于數(shù)據(jù)展示以及查詢服務(wù)等。
-
reporter:全鏈路統(tǒng)計(jì)各個(gè)環(huán)節(jié)的運(yùn)行狀況,如輸入/輸出 QPS、計(jì)算耗時(shí)、消費(fèi)堆積、遲到數(shù)據(jù)量等。
下文將詳細(xì)介紹這幾個(gè)模塊的設(shè)計(jì)和實(shí)現(xiàn)思路。
source
配置規(guī)則?
為了便于聚合指標(biāo)的生產(chǎn)和維護(hù),我們將指標(biāo)計(jì)算過(guò)程中涉及到的關(guān)鍵參數(shù)進(jìn)行了抽象提煉,提供了可視化配置頁(yè)面,如下圖所示。下文會(huì)結(jié)合具體場(chǎng)景介紹各個(gè)參數(shù)的用途。
規(guī)則加載?
在聚合任務(wù)運(yùn)行過(guò)程中,我們會(huì)定期加載配置。如果檢測(cè)到有新增的 Topic,我們會(huì)創(chuàng)建 kafka-consumer 線程,接收上游實(shí)時(shí)數(shù)據(jù)流。同理,對(duì)于已經(jīng)失效的配置,我們會(huì)關(guān)閉消費(fèi)線程,并清理相關(guān)的 reporter。
數(shù)據(jù)消費(fèi)?
對(duì)于數(shù)據(jù)源相同的聚合指標(biāo),我們共用一個(gè) kafka-consumer,拉取到記錄并解析后,對(duì)每個(gè)聚合指標(biāo)分別調(diào)用 collect() 進(jìn)行數(shù)據(jù)分發(fā)。如果指標(biāo)的數(shù)據(jù)篩選規(guī)則(配置項(xiàng)⑤)非空,在數(shù)據(jù)分發(fā)前需要進(jìn)行數(shù)據(jù)過(guò)濾,不滿足條件的數(shù)據(jù)直接丟棄。
process
整體計(jì)算流程?
基于 Flink 的 Stream API 實(shí)現(xiàn)聚合計(jì)算的核心代碼如下所示:
SingleOutputStreamOperator<MetricContext> aggResult = src.assignTimestampsAndWatermarks(new MetricWatermark()).keyBy(new MetricKeyBy()).window(new MetricTimeWindow()).aggregate(new MetricAggFuction());-
MetricWatermark():根據(jù)指定的時(shí)間字段(配置項(xiàng)⑧)獲取輸入數(shù)據(jù)的 timestamp,并驅(qū)動(dòng)計(jì)算流的 watermark 往前推進(jìn)。
-
MetricKeyBy():指定聚合維度,類似于 MySQL 中 groupby,根據(jù)分組字段(配置項(xiàng)⑥),從數(shù)據(jù)中獲取聚合維度的取值,拼接成分組 key。
-
MetricTimeWindow():配置項(xiàng)⑧中指定了聚合計(jì)算的窗口大小。如果配置了定時(shí)輸出,我們就創(chuàng)建滑動(dòng)窗口,否則就創(chuàng)建滾動(dòng)窗口。
-
MetricAggFuction():實(shí)現(xiàn)配置項(xiàng)②指定的各種算子的計(jì)算,下文將詳細(xì)介紹各個(gè)算子的實(shí)現(xiàn)原理。
二次聚合?
對(duì)于大數(shù)據(jù)量的聚合計(jì)算,數(shù)據(jù)傾斜是不得不考慮的問(wèn)題,數(shù)據(jù)傾斜意味著規(guī)則中配置的分組字段(配置項(xiàng)⑥)指定的聚合 key 存在熱點(diǎn)。我們的計(jì)算框架在設(shè)計(jì)之初就考慮了如何解決數(shù)據(jù)傾斜問(wèn)題,就是將聚合過(guò)程拆分成2階段:
-
第1階段:將數(shù)據(jù)隨機(jī)打散,進(jìn)行預(yù)聚合。
-
第2階段:將第1階段的預(yù)聚合結(jié)果作為輸入,進(jìn)行最終的聚合。
具體實(shí)現(xiàn):判斷并發(fā)度參數(shù) parallelism(配置項(xiàng)⑦) 是否大于1,如果 parallelism 大于1,生成一個(gè) [0, parallelism) 之間的隨機(jī)數(shù)作為 randomKey,在第1階段聚合 keyBy() 中,將依據(jù)分組字段(配置項(xiàng)⑥)獲取的 key 與 randomKey 拼接,生成最終的聚合 key,從而實(shí)現(xiàn)了數(shù)據(jù)隨機(jī)打散。
聚合算子?
作為一個(gè)平臺(tái)型的產(chǎn)品,我們提供了如下常見(jiàn)的聚合算子。由于采用了二次聚合邏輯,各個(gè)算子在第1階段和第2階段采用了相應(yīng)的計(jì)算策略。
對(duì)于計(jì)算結(jié)果受全部數(shù)據(jù)影響的算子,如 count-distinct(去重計(jì)數(shù)),常規(guī)思路是利用 set 的去重特性,將所有統(tǒng)計(jì)數(shù)據(jù)放在一個(gè) Set 中,最終在聚合函數(shù)的 getResult 中輸出 Set 的 size。如果統(tǒng)計(jì)數(shù)據(jù)量非常大,這個(gè) Set 對(duì)象就會(huì)非常大,對(duì)這個(gè) Set 的 I/O 操作所消耗的時(shí)間將不能接受。
對(duì)于類 MapReduce 的大數(shù)據(jù)計(jì)算框架,性能的瓶頸往往出現(xiàn)在 shuffle 階段大對(duì)象的 I/O 上,因?yàn)閿?shù)據(jù)需要序列化 / 傳輸 / 反序列化,Flink 也不例外。類似的算子還有 median 和 tp95。
為此,需要對(duì)這些算子做專門(mén)的優(yōu)化,優(yōu)化的思路就是盡量減少計(jì)算過(guò)程中使用的數(shù)據(jù)對(duì)象的大小,其中:
-
median/tp90/tp95:參考了 hive percentile_approx 的近似算法,該算法通過(guò) NumericHistogram(一種非等距直方圖)記錄數(shù)據(jù)分布,然后通過(guò)插值的方式得到相應(yīng)的 tp 值(median 是 tp50)。
-
count-distinct:采用 RoaringBitmap 算法,通過(guò)壓縮位圖的方式標(biāo)記輸入樣本,最終得到精確的去重計(jì)數(shù)結(jié)果。
-
count-distinct(近似) :采用 HyperLoglog 算法,通過(guò)基數(shù)計(jì)數(shù)的方式,得到近似的去重計(jì)數(shù)結(jié)果。該算法適用于大數(shù)據(jù)集的去重計(jì)數(shù)。
?后處理?
后處理模塊,是對(duì)第2階段聚合計(jì)算輸出數(shù)據(jù)進(jìn)行再加工,主要有2個(gè)功能:
-
復(fù)合指標(biāo)計(jì)算:對(duì)原始統(tǒng)計(jì)指標(biāo)進(jìn)行組合計(jì)算,得到新的組合指標(biāo)。例如,要統(tǒng)計(jì)登錄成功率,我們可以先分別統(tǒng)計(jì)出分母(登錄次數(shù))和分子(登錄成功的次數(shù)),然后將分子除以分母,從而得到一個(gè)新的組合指標(biāo)。配置項(xiàng)③就是用來(lái)配置組合指標(biāo)的計(jì)算規(guī)則。
-
相對(duì)指標(biāo)計(jì)算:告警規(guī)則中經(jīng)常要判斷某個(gè)指標(biāo)的相對(duì)變化情況(同比/環(huán)比)。我們利用 Flink 的state,能夠方便的計(jì)算出同比/環(huán)比指標(biāo),配置項(xiàng)④就是用來(lái)配置相對(duì)指標(biāo)規(guī)則。
異常數(shù)據(jù)的處理?
這里所說(shuō)的異常數(shù)據(jù),分為兩類:遲到的數(shù)據(jù)和提前到的數(shù)據(jù)。
遲到數(shù)據(jù)
-
對(duì)于嚴(yán)重遲到的數(shù)據(jù)(大于聚合窗口的 allowedLateness),通過(guò) sideOutputLateData 進(jìn)行收集,并通過(guò) reporter 統(tǒng)計(jì)上報(bào),從而能夠在監(jiān)控頁(yè)面進(jìn)行可視化監(jiān)控。
-
對(duì)于輕微遲到的數(shù)據(jù)(小于聚合窗口的 allowedLateness),會(huì)觸發(fā)窗口的重計(jì)算。如果每來(lái)一條遲到數(shù)據(jù)就觸發(fā)一次第 1 階段窗口的重計(jì)算,重計(jì)算結(jié)果傳導(dǎo)到第 2 階段聚合計(jì)算,就會(huì)導(dǎo)致部分?jǐn)?shù)據(jù)的重復(fù)統(tǒng)計(jì)。為了解決重復(fù)統(tǒng)計(jì)的問(wèn)題,我們?cè)诘?1 階段聚合 Trigger 中進(jìn)行了特殊處理:窗口觸發(fā)采用 FIRE_AND_PURGE(計(jì)算并清理),及時(shí)清理已經(jīng)參與過(guò)計(jì)算的數(shù)據(jù)。
提前到的數(shù)據(jù)
這部分?jǐn)?shù)據(jù)往往是數(shù)據(jù)上報(bào)端的時(shí)鐘不準(zhǔn)導(dǎo)致。在計(jì)算這些數(shù)據(jù)的 timestamp 時(shí)要人為干預(yù),避免影響整個(gè)計(jì)算流的 watermark。
sink
聚合計(jì)算得到的指標(biāo),默認(rèn)輸出到 Kafka 和時(shí)序數(shù)據(jù)庫(kù) InfluxDB。
-
kafka-sink:將指標(biāo)標(biāo)識(shí)(配置項(xiàng)①)作為 Kafka 的topic,將聚合結(jié)果發(fā)送出去,下游接收到該數(shù)據(jù)流后可以進(jìn)一步處理加工,如告警事件的生產(chǎn)等。
-
InfluxDB-sink:將指標(biāo)標(biāo)識(shí)(配置項(xiàng)①)作為時(shí)序數(shù)據(jù)庫(kù)的表名,將聚合結(jié)果持久化下來(lái),用于 API 的數(shù)據(jù)查詢、以及可視化報(bào)表展示等。
reporter
為了實(shí)時(shí)監(jiān)控各個(gè)數(shù)據(jù)源和聚合指標(biāo)的運(yùn)行情況,我們通過(guò) InfluxDB+Grafana 組合,實(shí)現(xiàn)了聚合計(jì)算全鏈路監(jiān)控:如各環(huán)節(jié)的輸入/輸出 QPS、計(jì)算耗時(shí)、消費(fèi)堆積、遲到數(shù)據(jù)量等。
總結(jié)
目前,通過(guò)該通用聚合框架,承載了網(wǎng)易云信 100+ 個(gè)不同維度的指標(biāo)計(jì)算,帶來(lái)的收益也是比較可觀的:
-
提效:采用了頁(yè)面配置化方式實(shí)現(xiàn)聚合指標(biāo)的生產(chǎn),開(kāi)發(fā)周期從天級(jí)縮短到分鐘級(jí)。沒(méi)有數(shù)據(jù)開(kāi)發(fā)經(jīng)驗(yàn)的同學(xué)也能夠自己動(dòng)手完成指標(biāo)的配置。
-
維護(hù)簡(jiǎn)單,資源利用率高:100+ 個(gè)指標(biāo)只需維護(hù) 1 個(gè) flink-job,資源消耗也從 300+ 個(gè) CU 減少到 40CU。
-
運(yùn)行過(guò)程透明:借助于全鏈路監(jiān)控,哪個(gè)計(jì)算環(huán)節(jié)有瓶頸,哪個(gè)數(shù)據(jù)源有問(wèn)題,一目了然。
?作者介紹?
圣少友,網(wǎng)易云信數(shù)據(jù)平臺(tái)資深開(kāi)發(fā)工程師,從事數(shù)據(jù)平臺(tái)相關(guān)工作,負(fù)責(zé)服務(wù)監(jiān)控平臺(tái)、數(shù)據(jù)應(yīng)用平臺(tái)、質(zhì)量服務(wù)平臺(tái)的設(shè)計(jì)開(kāi)發(fā)工作。
?延伸閱讀?
-
網(wǎng)易云信服務(wù)監(jiān)控平臺(tái)實(shí)踐
-
技術(shù)實(shí)踐 | Android 設(shè)備音視頻兼容性適配
-
技術(shù)實(shí)踐 | 網(wǎng)易云信在融合通信場(chǎng)景下的探索和實(shí)踐之 RTMPGateway 服務(wù)架構(gòu)
總結(jié)
以上是生活随笔為你收集整理的技术实践 | 如何基于 Flink 实现通用的聚合指标计算框架的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 网易云信联手长沙银行,远程视频银行系统助
- 下一篇: 给大家介绍一下:网易云信新晋音视频质量诊