Flink 在爱奇艺广告业务的实践
本文整理自愛奇藝技術(shù)經(jīng)理韓紅根在 5 月 22 日北京站 Flink Meetup 分享的議題《Flink 在愛奇藝廣告業(yè)務(wù)的實踐》,內(nèi)容包括:
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點(diǎn)贊送 star~
一、業(yè)務(wù)場景
實時數(shù)據(jù)在廣告業(yè)務(wù)的使用場景主要可以分為四個方面:
- 數(shù)據(jù)大屏:包括曝光、點(diǎn)擊、收入等核心指標(biāo)的展示,以及故障率等監(jiān)控指標(biāo);
- 異常監(jiān)測:因為廣告投放的鏈路比較?,所以如果鏈路上發(fā)生任何波動的話,都會對整體的投放效果產(chǎn)生影響。除此之外,各個團(tuán)隊在上線過程中是否會對整體投放產(chǎn)生影響,都是通過異常監(jiān)測系統(tǒng)能夠觀測到的。我們還能夠觀測業(yè)務(wù)指標(biāo)走勢是否合理,比如在庫存正常的情況下,曝光是否有不同的波動情況,這可以用來實 時發(fā)現(xiàn)問題;
- 數(shù)據(jù)分析:主要用于數(shù)據(jù)賦能業(yè)務(wù)發(fā)展。我們可以實時分析廣告投放過程中的一些異常問題,或者基于當(dāng)前的投放效果去研究怎樣優(yōu)化,從而達(dá)到更好的效果;
- 特征工程:廣告算法團(tuán)隊主要是做一些模型訓(xùn)練,用于支持線上投放。技術(shù)特征最初大部分是離線,隨著實時的發(fā)展,開始把一些工程轉(zhuǎn)到實時。
二、業(yè)務(wù)實踐
業(yè)務(wù)實踐主要分為兩類,第一個是實時數(shù)倉,第二個是特征工程。
1. 實時數(shù)倉
1.1 實時數(shù)倉 - 目標(biāo)
實時數(shù)倉的目標(biāo)包括數(shù)據(jù)完整性、服務(wù)穩(wěn)定性和查詢能力。
- 數(shù)據(jù)完整性:在廣告業(yè)務(wù)里,實時數(shù)據(jù)主要是用于指導(dǎo)決策,比如廣告主需要根據(jù)當(dāng)前投放的實時數(shù)據(jù),指導(dǎo)后面的出價或調(diào)整預(yù)算。另外,故障率的監(jiān)控需要數(shù)據(jù)本身是穩(wěn)定的。如果數(shù)據(jù)是波動的,指導(dǎo)意義就非常差,甚至沒有什么指導(dǎo)意義。因此完整性本身是對時效性和完整性之間做了一個權(quán)衡;
- 服務(wù)穩(wěn)定性:生產(chǎn)鏈包括數(shù)據(jù)接入、計算(多層)、數(shù)據(jù)寫入、進(jìn)度服務(wù)和查詢服務(wù)。除此之外還有數(shù)據(jù)質(zhì)量,包括數(shù)據(jù)的準(zhǔn)確性以及數(shù)據(jù)趨勢是否符合預(yù)期;
- 查詢能力:在廣告業(yè)務(wù)有多種使用場景,在不同場景里可能使用了不同的 OLAP 引擎,所以查詢方式和性能的要求不一致。另外,在做數(shù)據(jù)分析的時候,除了最新最穩(wěn)定的實時數(shù)據(jù)之外,同時也會實時 + 離線做分析查詢,此外還包括數(shù)據(jù)跨源和查詢性能等要求。
1.2 實時數(shù)倉 - 挑戰(zhàn)
- 數(shù)據(jù)進(jìn)度服務(wù):需要在時效性和完整性之間做一個權(quán)衡。
- 數(shù)據(jù)穩(wěn)定性:由于生產(chǎn)鏈路比較長,中間可能會用到多種功能組件,所以端到端的服務(wù)穩(wěn)定性對整體數(shù)據(jù)準(zhǔn)確性的影響是比較關(guān)鍵的。
- 查詢性能:主要包括 OLAP 分析能力。在實際場景中,數(shù)據(jù)表包含了離線和實時,單表規(guī)模達(dá)上百列,行數(shù)也是非常大的。
1.3 廣告數(shù)據(jù)平臺架構(gòu)
上圖為廣告數(shù)據(jù)平臺基礎(chǔ)架構(gòu)圖,從下往上看:
- 底部是數(shù)據(jù)采集層,這里與大部分公司基本一致。業(yè)務(wù)數(shù)據(jù)庫主要包含了廣告主的下單數(shù)據(jù)以及投放的策略;埋點(diǎn)日志和計費(fèi)日志是廣告投放鏈路過程中產(chǎn)生的日志;
中間是數(shù)據(jù)生產(chǎn)的部分,數(shù)據(jù)生產(chǎn)的底層是大數(shù)據(jù)的基礎(chǔ)設(shè)施,這部分由公司的一個云平臺團(tuán)隊提供,其中包含 Spark / Flink 計算引擎,Babel 統(tǒng)一的管理平臺。Talos 是實時數(shù)倉服務(wù),RAP 和 OLAP 對應(yīng)不同的實時分析以及 OLAP 存儲和查詢服務(wù)。
數(shù)據(jù)生產(chǎn)的中間層是廣告團(tuán)隊包含的一些服務(wù),例如在生產(chǎn)里比較典型的離線計算和實時計算。
- 離線是比較常見的一個分層模型,調(diào)度系統(tǒng)是對生產(chǎn)出的離線任務(wù)做有效的管理和調(diào)度。
- 實時計算這邊使用的引擎也比較多,我們的實時化是從 2016 年開始,當(dāng)時選的是 Spark Streaming,后面隨著大數(shù)據(jù)技術(shù)發(fā)展以及公司業(yè)務(wù)需求產(chǎn)生了不同場景,又引入了計算引擎 Flink。
- 實時計算底層調(diào)度依賴于云計算的 Babel 系統(tǒng),除了計算之外還會伴隨數(shù)據(jù)治理,包括進(jìn)度管理,就是指實時計算里一個數(shù)據(jù)報表當(dāng)前已經(jīng)穩(wěn)定的進(jìn)度到哪個時間點(diǎn)。離線里其實就對應(yīng)一個表,有哪些分區(qū)。
- 血緣管理包括兩方面,離線包括表級別的血緣以及字段血緣。實時主要還是在任務(wù)層面的血緣。
- 至于生命周期管理,在離線的一個數(shù)倉里,它的計算是持續(xù)迭代的。但是數(shù)據(jù)保留時間非常長的話,數(shù)據(jù)量對于底層的存儲壓力就會比較大。
- 數(shù)據(jù)生命周期管理主要是根據(jù)業(yè)務(wù)需求和存儲成本之間做一個權(quán)衡。
- 質(zhì)量管理主要包括兩方面,一部分在數(shù)據(jù)接入層,判斷數(shù)據(jù)本身是否合理;另外一部分在數(shù)據(jù)出口,就是結(jié)果指標(biāo)這一層。因為我們的數(shù)據(jù)會供給其他很多團(tuán)隊使用,因此在數(shù)據(jù)出口這一層要保證數(shù)據(jù)計算沒有問題。
再上層是統(tǒng)一查詢服務(wù),我們會封裝很多接口進(jìn)行查詢。
- 因為數(shù)據(jù)化包括離線和實時,另外還有跨集群,所以在智能路由這里會進(jìn)行一些選集群、選表以及復(fù)雜查詢、拆分等核心功能。
- 查詢服務(wù)會對歷史查詢進(jìn)行熱度的統(tǒng)一管理。這樣一方面可以更應(yīng)進(jìn)一步服務(wù)生命周期管理,另一方面可以去看哪些數(shù)據(jù)對于業(yè)務(wù)的意義非常大。
- 除了生命周期管理之外,它還可以指導(dǎo)我們的調(diào)度系統(tǒng),比如哪些報表比較關(guān)鍵,在資源緊張的時候就可以優(yōu)先調(diào)度這些任務(wù)。
- 再往上是數(shù)據(jù)應(yīng)用,包括報表系統(tǒng)、Add - hoc 查詢、數(shù)據(jù)可視化、異常監(jiān)控和下游團(tuán)隊。
1.4 實時數(shù)倉 - 生產(chǎn)鏈路
數(shù)據(jù)生產(chǎn)鏈路是從時間粒度來講的,我們最開始是離線數(shù)倉鏈路,在最底層的這一行,隨著實時化需求推進(jìn),就產(chǎn)生了一個實時鏈路,整理來說,是一個典型的 Lambda 架構(gòu)。
另外,我們的一些核心指標(biāo),比如計費(fèi)指標(biāo),因為它的穩(wěn)定性對下游比較關(guān)鍵,所以我們這邊采用異路多活。異路多活是源端日志產(chǎn)生之后,在計算層和下游存儲層做了完全的冗余,在后面的查詢里做統(tǒng)一處理。
1.5 實時數(shù)倉 - 進(jìn)度服務(wù)
上文介紹了我們要求提供出去的實時數(shù)據(jù)的指標(biāo)是穩(wěn)定不變的,進(jìn)度服務(wù)實現(xiàn)的核心點(diǎn)包括時間窗口里指標(biāo)的變化趨勢,同時結(jié)合了實時計算任務(wù)本身的狀態(tài),因為在實時數(shù)倉里,很多指標(biāo)是基于時間窗口做聚合計算。
比如一個實時指標(biāo),我們輸出的指標(biāo)是 3 分鐘,也就是說 4:00 這個時間點(diǎn)的指標(biāo)的就包括了 4:00~4:03 的數(shù)據(jù),4:03 包括了 4:03~4:06 的數(shù)據(jù),其實就是指一個時間窗口的數(shù)據(jù),什么時候是對外可見的。因為在實時計算里,數(shù)據(jù)不斷進(jìn)來, 4:00 的時間窗口的數(shù)據(jù)從 4:00 開始,指標(biāo)就已經(jīng)開始產(chǎn)生了。隨著時間疊加,指標(biāo)不斷上升,最后趨于穩(wěn)定。我們基于時間窗口指標(biāo)的變化率,來判斷它是否趨于穩(wěn)定。
但如果只是基于這個點(diǎn)來看,那么它還存在一定的弊端。
因為這個結(jié)果表的計算鏈會依賴很多個計算任務(wù),如果這個鏈路上面哪個任務(wù)出現(xiàn)問題,可能會導(dǎo)致當(dāng)前的指標(biāo)雖然走勢已經(jīng)趨于正常,但是最終并不完整。所以在這基礎(chǔ)之上,我們又引入了實時計算任務(wù)狀態(tài),在指標(biāo)趨于穩(wěn)定的時候,同時去看生產(chǎn)鏈路上這些計算任務(wù)是否正常,如果是正常的話,表示任務(wù)本身時間點(diǎn)的指標(biāo)已經(jīng)穩(wěn)定,可以對外提供服務(wù)。
如果計算有卡頓、堆積,或者已經(jīng)有異常在重啟過程中,就需要繼續(xù)等待迭代處理。
1.6 實時數(shù)倉 - 查詢服務(wù)
上圖為查詢服務(wù)架構(gòu)圖。
最下方是數(shù)據(jù),里面有實時存儲引擎,包括 Druid 等。在離線中,數(shù)據(jù)在 Hive 里邊,但是在做查詢的時候,會把它們進(jìn)行 OLAP 的同步,在這邊使用的是兩種引擎。為了和 Kudu 做 union 查詢,會把它同步到 OLAP 引擎,然后上面去統(tǒng)一使用 Impala 做查詢。另外,對于使用場景里比較固定的方式,可以導(dǎo)到 Kylin 里,然后在上面做數(shù)據(jù)分析。
基于這些數(shù)據(jù),會有多個查詢節(jié)點(diǎn),再上面是一個智能路由層。從最上面查詢網(wǎng)關(guān),當(dāng)有一個查詢請求進(jìn)來,首先判斷它是不是一個復(fù)雜場景。比如在一個查詢里,如果它的時長同時跨越了離線和實時,這里就會同時使用到離線表和實時表。
另外,離線表里還有更復(fù)雜的選表邏輯,比如小時級別,天級別。經(jīng)過復(fù)雜場景分析之后,就會把最終選擇的表大概確定下來。其實在做智能路由的時候,才會去參考左邊的一些基礎(chǔ)服務(wù),比如元數(shù)據(jù)管理,當(dāng)前這些表的進(jìn)度到哪個點(diǎn)了。
對于查詢性能的優(yōu)化,在數(shù)據(jù)里,底層掃描的數(shù)據(jù)量對最終性能的影響是非常大的。所以會有一個報表降維,根據(jù)歷史的查詢?nèi)プ龇治觥1热缭谝粋€降維表包含哪些維度,可以覆蓋到百分之多少的查詢。
1.7 數(shù)據(jù)生產(chǎn) - 規(guī)劃
之前在實時數(shù)據(jù)報表生產(chǎn)里提到,它主要是基于 API 的方式實現(xiàn)的。Lambda 架構(gòu)本身有一個問題就是實時跟離線是兩個計算團(tuán)隊,對于同一個需求,需要兩個團(tuán)隊同時去開發(fā),這樣會帶來幾個問題。
- 一方面是他們的邏輯可能會發(fā)生差異,最終導(dǎo)致結(jié)果表不一致;
- 另一方面是人力成本,同時需要兩個團(tuán)隊進(jìn)行開發(fā)。
因此我們的訴求是流批一體,思考在計算層是否可以使用一個邏輯來表示同一個業(yè)務(wù)需求,比如可以同時使用流或者批的計算引擎來達(dá)到計算的效果。
在這個鏈路里邊,原始數(shù)據(jù)通過 Kafka 的方式接入進(jìn)來,經(jīng)過統(tǒng)一的 ETL 邏輯,接著把數(shù)據(jù)放在數(shù)據(jù)湖里。因為數(shù)據(jù)湖本身可以同時支持流和批的方式進(jìn)行讀寫,而且數(shù)據(jù)湖本身可以實時消費(fèi),所以它既可以做實時計算,也可以做離線計算,然后統(tǒng)一把數(shù)據(jù)再寫回數(shù)據(jù)湖。
前文提到在做查詢的時候,會使用離線跟實時做統(tǒng)一整合,所以在數(shù)據(jù)湖里寫同一個表,在存儲層面可以省去很多工作,另外也可以節(jié)省存儲空間。
1.8 數(shù)據(jù)生產(chǎn) - SQL 化
SQL 化是 Talos 實時數(shù)倉平臺提供的能力。
從頁面上來看,它包括了幾個功能,左邊是項目管理,右邊包括 Source、Transform 和 Sink。
- 有一些業(yè)務(wù)團(tuán)隊本身對于計算引擎算子非常熟,那么他們便可以做一些代碼開發(fā);
- 但是很多業(yè)務(wù)團(tuán)隊可能對引擎并不是那么了解,或者沒有強(qiáng)烈的意愿去了解,他們就可以通過這種可視化的方式,拼接出一個作業(yè)。
例如,可以拖一個 Kafka 的數(shù)據(jù)源進(jìn)來,在上面做數(shù)據(jù)過濾,然后就可以拖一個 Filter 算子達(dá)到過濾邏輯,后面可以再去做一些 Project,Union 的計算,最后輸出到某個地方就可以了。
對于能力稍微高一些的同學(xué),可以去做一些更高層面的計算。這里也可以實現(xiàn)到實時數(shù)倉的目的,在里面創(chuàng)建一些數(shù)據(jù)源,然后通過 SQL 的方式,把邏輯表示出來,最終把這個數(shù)據(jù)輸出到某種存儲。
上面是從開發(fā)層面來講,在系統(tǒng)層面上,它其實還提供了一些其他的功能,比如規(guī)則校驗,還有開發(fā)/測試/上線,在這里可以統(tǒng)一管理。此外還有監(jiān)控,對線上跑的實時任務(wù)有很多實時指標(biāo),可以通過查看這些指標(biāo)來判斷當(dāng)前的任務(wù)是不是正常的狀態(tài)。
2. 特征工程
特征工程有兩方面的需求:
- 第一個需求是實時化,因為數(shù)據(jù)價值隨著時間的遞增會越來越低。比如某用戶表現(xiàn)出來的觀影行為是喜歡看兒童內(nèi)容,平臺就會推薦兒童相關(guān)的廣告。另外,用戶在看廣告過程中,會有一些正/負(fù)反饋的行為,如果把這些數(shù)據(jù)實時迭代到特征里,就可以有效提升后續(xù)的轉(zhuǎn)化效果。
實時化的另一個重點(diǎn)是準(zhǔn)確性,之前很多特征工程是離線的,在生產(chǎn)環(huán)節(jié)里面存在計算時的數(shù)據(jù)跟投放過程中的特征有偏差,基礎(chǔ)特征數(shù)據(jù)不是很準(zhǔn)確,因此我們要求數(shù)據(jù)要更實時、更準(zhǔn)確。
特征工程的第二個需求是服務(wù)穩(wěn)定性。
- 首先是作業(yè)容錯,比如作業(yè)在異常的時候能否正常恢復(fù);
- 另外是數(shù)據(jù)質(zhì)量,在實時數(shù)據(jù)里追求端到端精確一次。
2.1 點(diǎn)擊率預(yù)估
下面是在特征實時化里的實踐,首先是點(diǎn)擊率預(yù)估的需求。
點(diǎn)擊率預(yù)估案例的背景如上所示,從投放鏈路上來說,在廣告前端用戶產(chǎn)生觀影行為,前端會向廣告引擎請求廣告,然后廣告引擎在做廣告召回粗排/精排的時候會拿到用戶特征和廣告特征。把廣告返回給前端之后,后續(xù)用戶行為可能產(chǎn)生曝光、點(diǎn)擊等行為事件,在做點(diǎn)擊率預(yù)估的時候,需要把前面請求階段的特征跟后續(xù)用戶行為流里的曝光和點(diǎn)擊關(guān)聯(lián)起來,形成一個 Session 數(shù)據(jù),這就是我們的數(shù)據(jù)需求。
落實到具體實踐的話包括兩方面:
- 一方面是 Tracking 流里曝光、點(diǎn)擊事件的關(guān)聯(lián);
- 另一方面是特征流跟用戶行為的關(guān)聯(lián)。
在實踐過程中有哪些挑戰(zhàn)?
- 第一個挑戰(zhàn)是數(shù)據(jù)量;
- 第二個挑戰(zhàn)是實時數(shù)據(jù)亂序和延遲;
- 第三個挑戰(zhàn)是精確性要求高。
在時序上來說,特征肯定是早于 Tracking,但是兩個流成功關(guān)聯(lián)率在 99% 以上的時候,這個特征需要保留多久?因為在廣告業(yè)務(wù)中,用戶可以離線下載一個內(nèi)容,在下載的時候就已經(jīng)完成了廣告請求和返回了。但是后續(xù)如果用戶在沒有網(wǎng)的情況下觀看,這個事件并不會立馬返回,只有當(dāng)狀態(tài)恢復(fù)的時候,才會有后續(xù)曝光和點(diǎn)擊事件回傳。
所以這個時候,其實特征流和 Tracking 的時間概括是非常長的。我們經(jīng)過離線的數(shù)據(jù)分析,如果兩個流的關(guān)聯(lián)率達(dá) 99% 以上,那么特征數(shù)據(jù)就需要保留比較長的時間,目前是保留 7 天,這個量級還是比較大的。
上圖為點(diǎn)擊率預(yù)測的整體架構(gòu),剛才我們提到關(guān)聯(lián)包括兩部分:
- 第一個部分是用戶行為流里曝光跟點(diǎn)擊事件的關(guān)聯(lián),這里通過 CEP 實現(xiàn)。
- 第二個部分是兩個流的關(guān)聯(lián),前面介紹特征需要保留 7 天,它的狀態(tài)較大,已經(jīng)是上百 TB。這個量級在內(nèi)存里做管理,對數(shù)據(jù)穩(wěn)定性有比較大的影響,所以我們把特征數(shù)據(jù)放在一個外部存儲 (Hbase) 里,然后和 HBase 特征做一個實時數(shù)據(jù)查詢,就可以達(dá)到這樣一個效果。
但是因為兩個流的時序本身可能是錯開的,就是說,當(dāng)曝光、點(diǎn)擊出現(xiàn)的時候,可能這個特征還沒有到,那么就拿不到這個特征。所以我們做了一個多級重試隊列,保證最終兩個流關(guān)聯(lián)的完整性。
2.2 點(diǎn)擊率預(yù)估 - 流內(nèi)事件關(guān)聯(lián)
上圖右邊是更細(xì)的講解,闡述了流內(nèi)事件關(guān)聯(lián)為什么選擇 CEP 方案。業(yè)務(wù)需求是把用戶行為流里屬于同一次廣告請求,并且是同一個廣告的曝光跟點(diǎn)擊關(guān)聯(lián)起來。曝光之后,比如 5 分鐘之內(nèi)產(chǎn)生點(diǎn)擊,作為一個正樣本,5 分鐘之后出現(xiàn)的點(diǎn)擊則拋棄不要了。
可以想象一下,當(dāng)遇到這樣的場景,通過什么樣的方案可以實現(xiàn)這樣的效果。其實在一個流里多個事件的處理,可以用窗口來實現(xiàn)。但窗口的問題是:
- 如果事件序列本身都在同一個窗口之內(nèi),數(shù)據(jù)沒有問題;
- 但是當(dāng)事件序列跨窗口的時候,是達(dá)不到正常關(guān)聯(lián)效果的。
所以當(dāng)時經(jīng)過很多技術(shù)調(diào)研后,發(fā)現(xiàn) Flink 里的 CEP 可以實現(xiàn)這樣的效果,用類似政策匹配的方式,描述這些序列需要滿足哪些匹配方式。另外它可以指定一個時間窗口,比如曝光和點(diǎn)擊間隔 15 分鐘。
上圖左邊是匹配規(guī)則的描述,begin 里定義一個曝光,實現(xiàn)曝光之后 5 分鐘之內(nèi)的點(diǎn)擊,后面是描述一個可以出現(xiàn)多次的點(diǎn)擊,within 表示關(guān)聯(lián)窗口是多長時間。
在生產(chǎn)實踐過程中,這個方案大部分情況下可以關(guān)聯(lián)上,但是在做數(shù)據(jù)對比的時候,才發(fā)現(xiàn)存在某些曝光點(diǎn)擊沒有正常關(guān)聯(lián)到。
經(jīng)過數(shù)據(jù)分析,發(fā)現(xiàn)這些數(shù)據(jù)本身的特點(diǎn)是曝光跟點(diǎn)擊的時間戳都是毫秒級別,當(dāng)它們有相同毫秒時間戳的時候,這個事件就不能正常匹配。于是我們采用一個方案,人為地對于點(diǎn)擊事件加一毫秒,進(jìn)行人工錯位,這樣就保證曝光跟點(diǎn)擊能夠成功關(guān)聯(lián)上。
2.3 點(diǎn)擊率預(yù)估-雙流關(guān)聯(lián)
前文提到特征數(shù)據(jù)需要保留 7 天,所以狀態(tài)是上百 TB。需要把數(shù)據(jù)放在一個外部存儲里,因此在做技術(shù)選型時對外部存儲有一定的要求:
- 首先支持比較高的讀寫并發(fā)能力;
- 另外它的時效性需要非常低;
- 同時因為數(shù)據(jù)要保留 7 天,所以它最好具備生命周期管理能力。
基于以上幾個點(diǎn),最終選擇了 HBase,形成上圖的解決方案。
上面一行表示通過 CEP 之后把曝光點(diǎn)擊序列關(guān)聯(lián)在一起,最下面是把特征流通過 Flink 寫到 HBase 里,去做外部狀態(tài)存儲,中間核心模塊是用于達(dá)到兩個流的關(guān)聯(lián)。拿到曝光點(diǎn)擊關(guān)聯(lián)之后去查 HBase 數(shù)據(jù),如果能夠正常查到,就會把它輸出到一個正常結(jié)果流里。而對于那些不能構(gòu)成關(guān)聯(lián)的數(shù)據(jù),做了一個多級重試隊列,在多次重試的時候會產(chǎn)生隊列降級,并且在重試的時候為了減輕對 HBase 的掃描壓力,重試 Gap 會逐級增加。
另外還有一個退出機(jī)制,因為重試不是無限進(jìn)行的。退出機(jī)制的存在原因主要包括兩個點(diǎn):
- 第一點(diǎn)是特征數(shù)據(jù)保留了 7 天,如果對應(yīng)特征是在 7 天之前,那么它本身是關(guān)聯(lián)不到的。
- 另外在廣告業(yè)務(wù)里,存在一些外部的刷量行為,比如刷曝光或刷點(diǎn)擊,但它本身并沒有真實存在的廣告請求,所以這種場景也拿不到對應(yīng)特征。
因此,退出機(jī)制意味著在重試多次之后就會過期,然后會到重試過期的數(shù)據(jù)里。
2.4 有效點(diǎn)擊
在有效點(diǎn)擊場景里,其實也是兩個流的關(guān)聯(lián),但是兩個場景里的技術(shù)選型是完全不一樣的。
首先看一下項目背景,在網(wǎng)大場景里,影片本身就是一個廣告。用戶在點(diǎn)擊之后,就會進(jìn)入到一個播放頁面。在播放頁面里,用戶可以免費(fèi)觀看 6 分鐘,6 分鐘之后想要繼續(xù)觀看,需要是會員或者購買才行,在這里需要統(tǒng)計的數(shù)據(jù)是有效點(diǎn)擊,定義是在點(diǎn)擊之后觀影時長超過 6 分鐘即可。
這種場景落實到技術(shù)上是兩個流的關(guān)聯(lián),包括了點(diǎn)擊流和播放心跳流。
- 點(diǎn)擊流比較好理解,包括用戶的曝光和點(diǎn)擊等行為,從里面篩選點(diǎn)擊事件即可。
- 播放行為流是在用戶觀看的過程,會定時地把心跳信息回傳,比如三秒鐘回傳一個心跳,表明用戶在持續(xù)觀看。在定義時長超過 6 分鐘的時候,需要把這個狀態(tài)本身做一些處理,才能滿足 6 分鐘的條件。
在這個場景里,兩個流動 Gap 相對比較小,而在電影里時長一般是兩個多小時,所以點(diǎn)擊之后的行為,Gap 基本是在三個小時以內(nèi)才能完成,因此這里本身的狀態(tài)是相對比較小的,使用 Flink 的狀態(tài)管理可以達(dá)到這樣的效果。
接下來我們看一個具體的方案。
從流上來看,綠色部分是點(diǎn)擊流,藍(lán)色部分是播放心跳流。
- 在左邊的狀態(tài)里面,一個點(diǎn)擊事件進(jìn)來之后,會對這個點(diǎn)擊做一個狀態(tài)記錄,同時會注冊一個定時器做定期清理,定時器是三個小時。因為大部分影片的時長在三小時以內(nèi),如果這個時候?qū)?yīng)的播放事件還沒有一個目標(biāo)狀態(tài),點(diǎn)擊事件基本就可以過期了。
在右邊的播放心跳流里,這個狀態(tài)是對時長做累計,它本身是一個心跳流,比如每三秒傳一個心跳過來。我們需要在這里做一個計算,看它累計播放時長是不是達(dá)到 6 分鐘了,另外也看當(dāng)前記錄是不是到了 6 分鐘。對應(yīng) Flink 里的一個實現(xiàn)就是把兩個流通過 Connect 算子關(guān)系在一起,然后可以制定一個 CoProcessFunction,在這里面有兩個核心算子。
- 第一個算子是拿到狀態(tài) 1 的流事件之后,需要做一些什么樣的處理;
- 第二個算子是拿到第 2 個流事件之后,可以自定義哪些功能。
算子給用戶提供了很多靈活性,用戶可以在里面做很多邏輯控制。相比很多的 Input Join,用戶可發(fā)揮的空間比較大。
2.5 特征工程 - 小結(jié)
針對以上案例做一個小結(jié)。現(xiàn)在雙流管理已經(jīng)非常普遍,有許多方案可以選擇,比如 Window join,Interval join,還有我們使用的 Connect + CoProcessFunction。除此之外,還有一些用戶自定義的方案。
在選型的時候,建議從業(yè)務(wù)出發(fā),去做對應(yīng)的技術(shù)選型。首先要思考多個流之間的事件關(guān)系,然后判斷出狀態(tài)是什么規(guī)模,一定程度上可以從上面很多方案里排除不可行的方案。
三、Flink 使用過程中的問題及解決
1. 容錯
在 Flink 內(nèi)部主要是通過 Checkpoint 做容錯,Checkpoint 本身是對于 Job 內(nèi)部的 Task 級別的容錯,但是當(dāng) Job 主動或異常重啟時,狀態(tài)無法從歷史狀態(tài)恢復(fù)。
因此我們這邊做了一個小的改進(jìn),就是一個作業(yè)在啟動的時候,它也會去 Checkpoint 里把最后一次成功的歷史狀態(tài)拿到,然后做初始化管理,這樣就達(dá)到狀態(tài)恢復(fù)的效果。
2. 數(shù)據(jù)質(zhì)量
Flink 本身實現(xiàn)端到端精確一次,首先需要開啟 Checkpoint 功能,并且在 Checkpoint 里指定精確一次的語義。另外,如果在下游比如 Sink 端,它本身支持事務(wù),就可以結(jié)合兩階段提交與 Checkpoint 以及下游的事務(wù)做聯(lián)動,達(dá)到端到端精確一次。
在上圖右邊就是描述了這個過程。這是一個預(yù)提交的過程,就是 Checkpoint 協(xié)調(diào)器在做 Checkpoint 的時候,會往 Source 端注入一些 Barrier 數(shù)據(jù),每個 Source 拿到 Barrier 之后會做狀態(tài)存儲,然后把完成狀態(tài)反饋給協(xié)調(diào)器。這樣每個算子拿到 Barrier,其實是做相同的一個功能。
到 Sink 端之后,它會在 Kafka 里提交一個預(yù)提交標(biāo)記,后面主要是 Kafka 本身事務(wù)機(jī)制來保證的。在所有的算子都完成 Checkpoint 之后,協(xié)調(diào)器會給所有的算子發(fā)一個 ACK,發(fā)送一個確認(rèn)狀態(tài),這時候 Sink 端做一個提交動作就可以了。
3. Sink Kafka
在之前的實踐中我們發(fā)現(xiàn),下游 Kafka 增加分區(qū)數(shù)時,新增分區(qū)無數(shù)據(jù)寫入。
原理是 FlinkKafkaProducer 默認(rèn)使用 FlinkFixedPartitioner,每個 Task 只會發(fā)送到下游對應(yīng)的一個 Partition 中,如果下游 Kafka 的 Topic 的 Partition 大于當(dāng)前任務(wù)的并行度,就會出現(xiàn)該問題。
解決辦法有兩個:
- 第一個辦法是用戶自定義一個 FlinkKafkaPartitioner;
- 另一個辦法是默認(rèn)不配置,默認(rèn)輪詢寫入各個 Partition。
4. 監(jiān)控加強(qiáng)
對于運(yùn)行中的 Flink 作業(yè),我們需要查看它本身的一些狀態(tài)。比如在 Flink UI 里面,它的很多指標(biāo)都是在 Task 粒度,沒有整體的效果。
平臺這邊對這些指標(biāo)做了進(jìn)一步的聚合,統(tǒng)一在一個頁面里面展示。
從上圖可以看到,展示信息包括反壓狀態(tài),時延情況以及運(yùn)行過程中 JobManager 和 TaskManage 的 CPU / 內(nèi)存的利用率。另外還有 Checkpoint 的監(jiān)控,比如它是否超時,最近是否有 Checkpoint 已經(jīng)失敗了,后面我們會針對這些監(jiān)控指標(biāo)做一些報警通知。
5. 監(jiān)控報警
當(dāng)實時任務(wù)運(yùn)營異常的時候,用戶是需要及時知道這個狀態(tài)的,如上圖所示,有一些報警項,包括報警訂閱人、報警級別,下面還有一些指標(biāo),根據(jù)前面設(shè)置的指標(biāo)值,如果滿足這些報警策略規(guī)則,就會給報警訂閱人推送報警,報警方式包括郵件、電話以及內(nèi)部通訊工具,從而實現(xiàn)任務(wù)異常狀態(tài)通知。
通過這種方式,當(dāng)任務(wù)異常的時候,用戶可以及時知曉這個狀態(tài),然后進(jìn)行人為干預(yù)。
6. 實時數(shù)據(jù)生產(chǎn)
最后總結(jié)一下愛奇藝廣告業(yè)務(wù)在實時鏈路生產(chǎn)上面的關(guān)鍵節(jié)點(diǎn)。
- 我們的實時是從 2016 年開始起步,當(dāng)時主要功能點(diǎn)是做一些指標(biāo)實時化,使用的是 SparkStreaming;
- 2018 年上線了點(diǎn)擊率實時特征;
- 2019 年上線了 Flink 的端到端精確到一次和監(jiān)控強(qiáng)化。
- 2020 年上線了有效點(diǎn)擊實時特征;
- 同年10月,逐步推進(jìn)實時數(shù)倉的改進(jìn),把 API 生產(chǎn)方式逐漸 SQL 化;
- 2021 年 4 月,進(jìn)行流批一體的探索,目前先把流批一體放在 ETL 實現(xiàn)。
之前我們的 ETL 實時跟離線是分別做的,通過批處理的方式,然后換到 Hive 表里邊,后面跟的是離線數(shù)倉。在實時里,經(jīng)過實時 ETL,放到 Kafka 里邊,然后去做后續(xù)的實時數(shù)倉。
先在 ETL 做流批一體的第一個好處是離線數(shù)倉時效性提升,因為數(shù)據(jù)需要做反作弊,所以我們給廣告算法提供基礎(chǔ)特征的時候,反作弊之后的時效性對于后續(xù)整體效果的提升是比較大的,所以如果把 ETL 做成統(tǒng)一實時化之后,對于后續(xù)的指導(dǎo)意義非常大。
ETL 做到流批一體之后,我們會把數(shù)據(jù)放在數(shù)據(jù)湖里面,后續(xù)離線數(shù)倉和實時數(shù)倉都可以基于數(shù)據(jù)湖實現(xiàn)。流批一體可以分為兩個階段,第一階段是先把 ETL 做到一體,另外報表端也可以放在數(shù)據(jù)湖里邊,這樣我們的查詢服務(wù)可以做到一個更新的量級。因為之前需要離線表跟實時表做一個 Union 的計算,在數(shù)據(jù)湖里面,我們通過離線和實時寫一個表就可以實現(xiàn)了。
四、未來規(guī)劃
關(guān)于未來規(guī)劃:
首先是流批一體,這里包括兩個方面:
- 第一個是 ETL 一體,目前已經(jīng)是基本達(dá)到可線上的狀態(tài)。
- 第二個是實時報表 SQL 化和數(shù)據(jù)湖的結(jié)合。
- 另外,現(xiàn)在的反作弊主要是通過離線的方式實現(xiàn),后面可能會把一些線上的反作弊模型轉(zhuǎn)成實時化,把風(fēng)險降到最低。
原文鏈接:https://developer.aliyun.com/article/786120?
版權(quán)聲明:本文內(nèi)容由阿里云實名注冊用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報,一經(jīng)查實,本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的Flink 在爱奇艺广告业务的实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东:Flink SQL 优化实战
- 下一篇: 友邦人寿引入阿里云PolarDB云数据库