日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink 靠什么征服饿了么工程师?

發布時間:2024/9/3 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 靠什么征服饿了么工程师? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

阿里妹導讀:本文將為大家展示餓了么大數據平臺在實時計算方面所做的工作,以及計算引擎的演變之路,你可以借此了解Strom、Spark、Flink的優缺點。如何選擇一個合適的實時計算引擎?Flink憑借何種優勢成為餓了么首選?本文將帶你一一解開謎題。

本文作者:易偉平

整理:姬平&鄭寧

平臺現狀

下面是目前餓了么平臺現狀架構圖:

來源于多個數據源的數據寫到kafka里,計算引擎主要是Storm,Spark和Flink,計算引擎出來的結果數據再落地到各種存儲上。

目前Storm任務大概有100多個,Spark任務有50個左右,Flink暫時還比較少。

目前我們集群規模每天數據量有60TB,計算次數有1000000000,節點有400個。這里要提一下,Spark和Flink都是on yarn的,其中Flink onyarn主要是用作任務間jobmanager隔離, Storm是standalone模式。

應用場景

1.一致性語義

在講述我們應用場景之前,先強調實時計算一個重要概念, 一致性語義:

1) at-most-once:即fire and forget,我們通常寫一個java的應用,不去考慮源頭的offset管理,也不去考慮下游的冪等性的話,就是簡單的at-most-once,數據來了,不管中間狀態怎樣,寫數據的狀態怎樣,也沒有ack機制。

2) at-least-once: 重發機制,重發數據保證每條數據至少處理一次。

3) exactly-once: 使用粗Checkpoint粒度控制來實現exactly-once,我們講的exactly-once大多數指計算引擎內的exactly-once,即每一步的operator內部的狀態是否可以重放;上一次的job如果掛了,能否從上一次的狀態順利恢復,沒有涉及到輸出到sink的冪等性概念。

4) at-least-one + idempotent = exactly-one:如果我們能保證說下游有冪等性的操作,比如基于mysql實現 update on duplicate key;或者你用es, cassandra之類的話,可以通過主鍵key去實現upset的語義, 保證at-least-once的同時,再加上冪等性就是exactly-once。

2. Storm

餓了么早期都是使用Storm,16年之前還是Storm,17年才開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個概念:

1) 數據是tuple-based

2) 毫秒級延遲

3) 主要支持java, 現在利用apache beam也支持python和go。

4) Sql的功能還不完備,我們自己內部封裝了typhon,用戶只需要擴展我們的一些接口,就可以使用很多主要的功能;flux是Storm的一個比較好的工具,只需要寫一個yaml文件,就可以描述一個Storm任務,某種程度上說滿足了一些需求,但還是要求用戶是會寫java的工程師,數據分析師就使用不了。

★ 2.1 總結

1) 易用性:因為使用門檻高,從而限制了它的推廣。

2)StateBackend:更多的需要外部存儲,比如redis之類的kv存儲。

3) 資源分配方面:用worker和slot提前設定的方式,另外由于優化點做的較少,引擎吞吐量相對比較低一點。

3. Sparkstreaming

有一天有個業務方過來提需求說 我們能不能寫個sql,幾分鐘內就可以發布一個實時計算任務。 于是我們開始做Sparkstreaming。它的主要概念如下:

1) Micro-batch:需要提前設定一個窗口,然后在窗口內處理數據。

2) 延遲是秒級級別,比較好的情況是500ms左右。

3) 開發語言是java和scala。

4)streaming SQL,主要是我們的工作,我們希望提供streaming SQL的平臺。

特點:

1) Spark生態和SparkSQL: 這是Spark比較好的地方,技術棧是統一的,SQL,圖計算,machine learning的包都是可以互調的。因為它先做的是批處理,和Flink不一樣,所以它天然的實時和離線的api是統一的。

2) Checkpointon hdfs。

3) onyarn:Spark是屬于hadoop生態體系,和yarn集成度高。

4) 高吞吐: 因為它是Micro-batch的方式,吞吐也是比較高的。

下面給大家大致展示一下我們平臺用戶快速發布一個實時任務的操作頁面,它需要哪些步驟。我們這里不是寫DDL和DML語句,而是ui展示頁面的方式。

頁面里面會讓用戶選一些必要的參數, 首先會選哪一個kafka集群,每個分區消費多少,反壓也是默認開啟的。消費位置需要讓用戶每次去指定,有可能用戶下一次重寫實時任務的時候,可以根據業務需求去選擇offset消費點。

中間就是讓用戶描述pipeline。 SQL就是kafka的多個topic,輸出選擇一個輸出表,SQL把上面消費的kafka DStream注冊成表,然后寫一串pipeline,最后我們幫用戶封裝了一些對外sink(剛剛提到的各種存儲都支持,如果存儲能實現upsert語義的話,我們都是支持了的)。

★ 3.1 MultiStream-Join

雖然剛剛滿足一般無狀態批次內的計算要求,但就有用戶想說, 我想做流的join怎么辦, 早期的Spark1.5可以參考Spark-streamingsql這個開源項目把 DStream注冊為一個表,然后對這個表做join的操作,但這只支持1.5之前的版本,Spark2.0推出structured streaming之后項目就廢棄了。我們有一個tricky的方式:

讓Sparkstreaming去消費多個topic,但是我根據一些條件把消費的DStream里面的每個批次RDD轉化為DataFrame,這樣就可以注冊為一張表,根據特定的條件,切分為兩張表,就可以簡單的做個join,這個join的問題完全依賴于本次消費的數據,它們join的條件是不可控的,是比較tricky的方式。比如說下面這個例子,消費兩個topic,然后簡單通過filer條件,拆成兩個表,然后就可以做個兩張表的join,但它本質是一個流。

★ 3.2 Exactly-once

exactly-once需要特別注意一個點:

我們必須要求數據sink到外部存儲后,offset才能commit,不管是到zk,還是mysql里面,你最好保證它在一個transaction里面,而且必須在輸出到外部存儲(這里最好保證一個upsert語義,根據unique key來實現upset語義)之后,然后這邊源頭driver再根據存儲的offeset去產生kafka RDD,executor再根據kafka每個分區的offset去消費數據。如果滿足這些條件,就可以實現端到端的exactly-once. 這是一個大前提。

★ 3.3 總結

1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):我們要實現跨批次帶狀態的計算的話,在1.X版本,我們通過這兩個接口去做,但還是需要把這個狀態存到hdfs或者外部去,實現起來比較麻煩一點。

2) Real Multi-Stream Join:沒辦法實現真正的多個流join的語義。

3)End-To-End Exactly-Once Semantics:它的端到端的exactly-once語義實現起來比較麻煩,需要sink到外部存儲后還需要手動的在事務里面提交offset。

4. STRUCTUREDSTREAMING

我們調研然后并去使用了Spark2.X之后帶狀態的增量計算。下面這個圖是官方網站的:

所有的流計算都參照了Google的 data flow,里面有個重要的概念:數據的processing time和event time,即數據的處理時間和真正的發生時間有個gap。于是流計算領域還有個watermark,當前進來的事件水位需要watermark來維持,watermark可以指定時間delay的范圍,在延遲窗口之外的數據是可以丟棄的,在業務上晚到的數據也是沒有意義的。

下面是structuredstreaming的架構圖:

這里面就是把剛才Sparkstreaming講exactly-once的步驟1,2,3都實現了,它本質上還是分批的batch方式,offset自己維護,狀態存儲用的hdfs,對外的sink沒有做類似的冪等操作,也沒有寫完之后再去commit offset,它只是再保證容錯的同時去實現內部引擎的exactly-once。

★ 4.1 特點

1) Stateful Processing SQL&DSL:可以滿足帶狀態的流計算

2) Real Multi-Stream Join:可以通過Spark2.3實現多個流的join,多個流的join做法和Flink類似,你需要先定義兩個流的條件(主要是時間作為一個條件),比如說有兩個topic的流進來,然后你希望通過某一個具體的schema中某個字段(通常是event time)來限定需要buffer的數據,這樣可以實現真正意義上的流的join。

3)比較容易實現端到端的exactly-once的語義,只需要擴展sink的接口支持冪等操作是可以實現exactly-once的。

特別說一下,Structuredstreaming和原生的streaming的api有一點區別,它create表的Dataframe的時候,是需要指定表的schema的,意味著你需要提前指定schema。另外它的watermark是不支持SQL的,于是我們加了一個擴展,實現完全寫sql,可以從左邊到右邊的轉換(下圖),我們希望用戶不止是程序員,也希望不會寫程序的數據分析師等同學也能用到。

★ 4.2 總結

1) Trigger(Processing Time、 Continuous ):2.3之前主要基于processing Time,每個批次的數據處理完了立馬觸發下一批次的計算。2.3推出了record by record的持續處理的trigger。

2)Continuous Processing (Only Map-Like Operations):目前它只支持map like的操作,同時sql的支持度也有些限制。

3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證需要自己做一些額外的擴展, 我們發現kafka0.11版本提供了事務的功能,是可以從基于這方面考慮從而去實現從source到引擎再到sink,真正意義上的端到端的exactly-once。

4) CEP(Drools):我們發現有業務方需要提供cep 這樣復雜事件處理的功能,目前我們的語法無法直接支持,我們讓用戶使用規則引擎Drools,然后跑在每個executor上面,依靠規則引擎功能去實現cep。

于是基于以上幾個Spark structuredstreaming的特點和缺點,我們考慮使用Flink來做這些事情。

5.Flink

Flink目標是對標Spark,流這塊是領先比較多,它野心也比較大,圖計算,機器學習等它都有,底層也是支持yarn,tez等。對于社區用的比較多的存儲,Flink社區官方都支持比較好,相對來說。

Flink的框架圖:

Flink中的JobManager,相當于Spark的driver角色,taskManger相當于executor,里面的task也有點類似Spark的那些task。 不過Flink用的rpc是akka,同時Flink core自定義了內存序列化框架,另外task無需像Spark每個stage的task必須相互等待而是處理完后即往下游發送數據。

Flink binary data處理operator:

?

Spark的序列化用戶一般會使用kryo或者java默認的序列化,同時也有Tungsten項目對Spark程序做一jvm層面以及代碼生成方面的優化。相對于Spark,Flink自己實現了基于內存的序列化框架,里面維護著key和pointer的概念,它的key是連續存儲,在cpu層面會做一些優化,cache miss概率極低。比較和排序的時候不需要比較真正的數據,先通過這個key比較,只有當它相等的時候,才會從內存中把這個數據反序列化出來,再去對比具體的數據,這是個不錯的性能優化點。

Flink task chain:

Task中operatorchain,是比較好的概念。如果上下游數據分布不需要重新shuffle的話,比如圖中source是kafka source,后面跟的map只是一個簡單的數據filter,我們把它放在一個線程里面,就可以減少線程上下文切換的代價。

并行度概念

比如說這里面會有5個task,就會有幾個并發線程去跑,chain起來的話放在一個線程去跑就可以提升數據傳輸性能。Spark是黑盒的,每個operator無法設并發度,而Flink可以對每個operator設并發度,這樣可以更靈活一點,作業運行起來對資源利用率也更高一點。

Spark 一般通過Spark.default.parallelism來調整并行度,有shuffle操作的話,并行度一般是通Spark.sql.shuffle.partitions參數來調整,實時計算的話其實應該調小一點,比如我們生產中和kafka的partition數調的差不多,batch在生產上會調得大一點,我們設為1000,左邊的圖我們設并發度為2,最大是10,這樣首先分2個并發去跑,另外根據key做一個分組的概念,最大分為10組,就可以做到把數據盡量的打散。

State & Checkpoint

因為Flink的數據是一條條過來處理,所以Flink中的每條數據處理完了立馬發給下游,而不像spark,需要等該operator所在的stage所有的task都完成了再往下發。

Flink有粗粒度的checkpoint機制,以非常小的代價為每個元素賦予一個snapshot概念,只有當屬于本次snapshot的所有數據都進來后才會觸發計算,計算完后,才把buffer數據往下發,目前Flink sql沒有提供控制buffer timeout的接口,即我的數據要buffer多久才往下發。可以在構建Flink context時,指定buffer timeout為0,處理完的數據才會立馬發下去,不需要等達到一定閾值后再往下發。

Backend默認是維護在jobmanager內存,我們更多使用的的是寫到hdfs上,每個operator的狀態寫到rocksdb上,然后異步周期增量同步到外部存儲。

容錯

圖中左半部分的紅色節點發生了failover,如果是at-least-once,則其最上游把數據重發一次就好;但如果是exactly-once,則需要每個計算節點從上一次失敗的時機重放。

Exactly Once Two-Phase Commit

Flink1.4之后有兩階段提交來支持exactly-once.它的概念是從上游kafka消費數據后,每一步都會發起一次投票,來記錄狀態,通過checkpoint的屏障來處理標記,只有最后再寫到kafka(0.11之后的版本),只有最后完成之后,才會把每一步的狀態讓jobmanager中的cordinator去通知可以固化下來,這樣實現exactly-once。

Savepoints

還有一點Flink比較好的就是,基于它的checkpoint來實現savepoint功能。業務方需要每個應用恢復節點不一樣,希望恢復到的版本也是可以指定的,這是比較好的。這個savepoint不只是數據的恢復,也有計算狀態的恢復。

特點:

1) Trigger (Processing Time、 Event Time、IngestionTime):對比下,Flink支持的流式語義更豐富,不僅支持Processing Time, 也支持Event time和Ingestion Time。

2)Continuous Processing & Window:支持純意義上的持續處理,recordby record的,window也比Spark處理的好。

3) Low End-To-End Latency With Exactly-Once Guarantees:因為有兩階段提交,用戶是可以選擇在犧牲一定吞吐量的情況下,根據業務需求情況來調整來保證端到端的exactly-once。

4) CEP:支持得好。

5) Savepoints:可以根據業務的需求做一些版本控制。

也有做的還不好的:

1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達到99%以上。 SQL函數不支持,目前還無法對單個operator做并行度的設置。

2) ML、Graph等:機器學習,圖計算等其他領域比Spark要弱一點,但社區也在著力持續改進這個問題。

我們期待和你一起,把Flink建設得更好,幫助更多開發者。

大數據計算引擎,你pick哪個?

訪問:http://cn.mikecrm.com/d0nUFOK?from=singlemessage&isappinstalled=0

參與問卷調研,約需15分鐘,認真答題的小伙伴還有機會獲得定制禮品。

?

每天一篇技術文章,

看不過癮?

關注“阿里巴巴機器智能”微信公眾號

發現更多AI干貨。

總結

以上是生活随笔為你收集整理的Flink 靠什么征服饿了么工程师?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。