基于 Apache Hudi 构建流批一体系统实践
1. 前言
當前公司的大數據實時鏈路如下圖,數據源是MySQL數據庫,然后通過Binlog Query的方式消費或者直接客戶端采集到Kafka,最終通過基于Spark/Flink實現的批流一體計算引擎處理,最后輸出到下游對應的存儲。
2. 模型特征架構的演進
2.1 第一代架構
廣告業務發展初期,為了提升策略迭代效率,整理出一套通用的特征生產框架,該框架由三部分組成:特征統計、特征推送和特征獲取模型訓練。如下圖所示:
?客戶端以及服務端數據先通過統一服務Sink到HDFS上?基于基HDFS數據,統計特定維度的總量、分布等統計類特征并推送到Codis中?從Codis中獲取特征小時維度模型增量Training,讀取HDFS文件進行天級別增量Training
該方案能夠滿足算法的迭代,但是有以下幾個問題
?由于Server端直接Put本地文件到HDFS上無法做到根據事件時間精準分區,導致數據源不同存在口徑問題?不可控的小文件、空文件問題?數據格式單一,只支持json格式?用戶使用成本較高,特征抽取需要不斷的Coding?整個架構擴展性較差
為解決上述問題,我們對第一代架構進行了演進和改善,構建了第二代批流一體架構(另外該架構升級也是筆者在餓了么進行架構升級的演進路線)。
2.2 第二代架構
2.2.1 批流一體平臺的構建
首先將數據鏈路改造為實時架構,將Spark Structured Streaming(下文統一簡稱SS)與Flink SQL語法統一,同時實現與Flink SQL語法大體上一致的批流一體架構,并且做了一些功能上的增強與優化。
為什么有了Flink還需要支持SS呢?主要有以下幾點原因
?Spark生態相對更完善,當然現在Flink也做的非常好了?用戶使用習慣問題,有些用戶對從Spark遷移到Flink沒有多大訴求?SS Micro Batch引擎的抽象做批流統一更加絲滑?相比Flink純內存的計算模型,在延遲不敏感的場景Spark更友好
這里舉一個例子,比如批流一體引擎SS與Flink分別創建Kafka table并寫入到ClickHouse,語法分別如下
Spark Structured Streaming語法如下
--Spark Structured StreamingCREATE STREAM spark ( ad_id STRING, ts STRING, event_ts as to_timestamp(ts)) WITH ('connector' = 'kafka','topic' = 'xx','properties.bootstrap.servers'='xx','properties.group.id'='xx','startingOffsets'='earliest','eventTimestampField' = 'event_ts','watermark' = '60 seconds','format'='json');create SINK ck( ad_id STRING, ts STRING, event_ts timestamp) WITH( 'connector'='jdbc', 'url'='jdbc:clickhouse://host:port/db', 'table-name'='table', 'username'='user', 'password'='pass', 'sink.buffer-flush.max-rows'='10', 'sink.buffer-flush.interval' = '5s', 'sink.parallelism' = '3' 'checkpointLocation'= 'checkpoint_path',);insert into ck select * from spark ;Flink SQL語法如下
CREATE TABLE flink ( ad_id STRING, ts STRING, event_ts as to_timestamp(ts) )WITH ('connector' = 'kafka','topic' = 'xx','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='300s','format' = 'json');CREATE TABLE ck ( ad_id VARCHAR, ts VARCHAR, event_ts timestamp(3) PRIMARY KEY (ad_id) NOT ENFORCED) WITH ('connector'='jdbc', 'url'='jdbc:clickhouse://host:port/db','table-name'='table','username'='user','password'='pass','sink.buffer-flush.max-rows'='10','sink.buffer-flush.interval' = '5s','sink.parallelism' = '3');insert into ck select * from flink ;2.2.2 模型特征處理新架構
新的模型特征處理采用批流一體的架構,上游對接數據源還是Kafka,模型主要有兩個訴求
?支持增量讀取方式減少模型更新的實效性?利用CDC來實現特征的回補
整個流程如下圖
2.2.3 Hudi、Delta還是Iceberg
3個項目都是目前活躍的開源數據湖方案,feature to feature的展開詳細說篇幅太長,大致列舉一下各自的優缺點。
其實通過對比可以發現各有優缺點,但往往會因為訴求不同,在實際落地生產時3種選型會存在同時多個共存的情況,為什么我們在模型特征的場景最終選擇了Hudi呢?主要有以下幾點
?國內Hudi社區非常活躍,問題可以很快得到解決?Hudi對Spark2的支持更加友好,公司算法還是Spark2為主?算法希望有增量查詢的能力,而增量查詢能力是Hudi原生主打的能力,與我們的場景非常匹配?Hudi非常適合CDC場景,對CDC場景支持非常完善
2.2.4 方案上線
我們計劃用Spark跟Flink雙跑,通過數據質量以及資源成本來選擇合適的計算引擎。選擇的一個case是廣告曝光ed流跟用戶點擊Click流Join之后落地到Hudi,然后算法增量查詢抽取特征更新模型。
2.2.4.1 Flink方案
最初我們用的是Flink 1.12.2 + Hudi 0.8.0,但是實際上發現任務跑起來并不順利,使用master最新代碼0.9.0-SNAPSHOT之后任務可以按照預期運行,運行的Flink SQL如下
CREATE TABLE ed ( `value` VARCHAR, ts as get_json_object(`value`,'$.ts'), event_ts as to_timestamp(ts), WATERMARK FOR event_ts AS event_ts - interval '1' MINUTE, proctime AS PROCTIME())WITH ('connector' = 'kafka','topic' = 'ed','scan.startup.mode' = 'group-offsets','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='100s','scan.startup.mode'='group-offsets','format'='schemaless');CREATE TABLE click ( req_id VARCHAR, ad_id VARCHAR, ts VARCHAR, event_ts as to_timestamp(ts), WATERMARK FOR event_ts AS event_ts - interval '1' MINUTE, proctime AS PROCTIME())WITH ('connector' = 'kafka','topic' = 'click','properties.bootstrap.servers'='xx','scan.startup.mode' = 'group-offsets','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='100s','format'='json');CREATE TABLE hudi(uuid VARCHAR,ts VARCHAR,json_info VARCHAR, is_click INT,dt VARCHAR,`hour` VARCHAR,PRIMARY KEY (uuid) NOT ENFORCED)PARTITIONED BY (dt,`hour`)WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///xx', 'write.tasks' = '10', 'write.precombine.field'='ts', 'compaction.tasks' = '1', 'table.type' = 'COPY_ON_WRITE' );insert into hudi SELECT concat(req_id, ad_id) uuid, date_format(event_ts,'yyyyMMdd') AS dt, date_format(event_ts,'HH') `hour`, concat(ts, '.', cast(is_click AS STRING)) AS ts, json_info,is_clickFROM (SELECT t1.req_id,t1.ad_id,t1.ts,t1.json_info, if(t2.req_id <> t1.req_id,0,1) as is_click, ROW_NUMBER() OVER (PARTITION BY t1.req_id,t1.ad_id,t1.ts ORDER BY if(t2.req_id <> t1.req_id,0,1) DESC) as row_num FROM (select ts,event_ts,map_info['req_id'] req_id,map_info['ad_id'] ad_id, `value` as json_info from ed,LATERAL TABLE(json_tuple(`value`,'req_id','ad_id')) as T(map_info)) t1 LEFT JOIN click t2 ON t1.req_id=t1.req_id and t1.ad_id=t2.ad_id and t2.event_ts between t1.event_ts - INTERVAL '10' MINUTE and t1.event_ts + INTERVAL '4' MINUTE ) a where a.row_num=1;標注:上述SQL中有幾處與官方SQL不一致,主要是實現了統一規范Schema為一列的Schemaless的Format、與Spark/Hive語義基本一致的get_json_object以及json_tuple UDF,這些都是在批流一體引擎做的功能增強的一小部分。
但是在運行一周后,面臨著業務上線Delay的壓力以及暴露出來的兩個問題讓我們不得不先暫時放棄Flink方案
?任務反壓的問題(無論如何去調整資源似乎都會出現嚴重的反壓,雖然最終我們通過在寫入Hudi之前增加一個upsert-kafka的中間流程解決了,但鏈路過長這并不是我們預期內的)?還有一點是任務存在丟數據的風險,對比Spark方案發現Flink會有丟數據的風險
標注:這個case并非Flink集成Hudi不夠,國內已經有很多使用Flink引擎寫入Hudi的實踐,但在我們場景下因為為了確保上線時間,沒有太多時間細致排查問題。實際上我們這邊Kafka -> Hive鏈路有95%的任務都使用Flink替代了Spark Structured Streaming(SS)
2.2.4.2 Spark方案
由于沒有在Hudi官方網站上找到SS集成的說明,一開始筆者快速實現了SS與Hudi的集成,但是在通讀Hudi代碼之后發現其實社區早已有了SS的完整實現,另外咨詢社區同學leesf之后給出的反饋是當前SS的實現也很穩定。稍作適配SS版本的任務也在一天之內上線了,任務SQL如下
CREATE STREAM ed ( value STRING, ts as get_json_object(value,'$.ts'), event_ts as to_timestamp(get_json_object(value,'$.ts'))) WITH ('connector' = 'kafka','topic' = 'ed','properties.bootstrap.servers'='xx','properties.group.id'='xx','startingOffsets'='earliest','minPartitions' = '60','eventTimestampField' = 'event_ts','maxOffsetsPerTrigger' = '250000', 'watermark' = '60 seconds','format'='schemaless');CREATE STREAM click ( req_id STRING, ad_id STRING, ts STRING, event_ts as to_timestamp(ts)) WITH ('connector' = 'kafka','topic' = 'click','properties.bootstrap.servers'='xxxx'properties.group.id'='dw_ad_algo_naga_dsp_ed_click_rt','startingOffsets'='earliest','maxOffsetsPerTrigger' = '250000','eventTimestampField' = 'event_ts','minPartitions' = '60','watermark' = '60 seconds','format'='json');--可以動態注冊python、java、scala udfcreate python function py_f with ( 'code' = 'def apply(self,m): return 'python_{}'.format(m)','methodName'= 'apply','dataType' = 'string');create SINK hudi(uuid STRING,dt STRING,hour STRING,ts STRING,json_info STRING, is_click INT) WITH ( 'connector'='hudi', 'hoodie.table.name' = 'ed_click', 'path' ='hdfs:///xx', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'hoodie.datasource.write.precombine.field' = 'ts', 'hoodie.datasource.write.operation' = 'upsert', 'hoodie.datasource.write.partitionpath.field' = 'dt,hour', 'hoodie.datasource.write.keygenerator.class'= 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.hive_style_partitioning'='true', 'hoodie.datasource.write.streaming.ignore.failed.batch'='false', 'hoodie.keep.min.commits'='120', 'hoodie.keep.max.commits'='180', 'hoodie.cleaner.commits.retained'='100', --'hoodie.datasource.write.insert.drop.duplicates' = 'true', --'hoodie.fail.on.timeline.archiving'='false', --'hoodie.datasource.hive_sync.table'='true', -- 'hoodie.datasource.hive_sync.database'='ods_test', -- 'hoodie.datasource.hive_sync.table'='ods_test_hudi_test2', -- 'hoodie.datasource.hive_sync.use_jdbc'='false', -- 'hoodie.datasource.meta.sync.enable' ='true', -- 'hoodie.datasource.hive_sync.partition_fields'='dt,hour', -- 'hoodie.datasource.hive_sync.partition_extractor_class'='org.apache.hudi.hive.MultiPartKeysValueExtractor', 'trigger'='30', 'checkpointLocation'= 'checkpoint_path');INSERT INTO hudiSELECT concat(req_id, ad_id) uuid, date_format(ts,'yyyyMMdd') dt, date_format(ts,'HH') hour, concat(ts, '.', cast(is_click AS STRING)) AS ts, json_info, is_clickFROM ( SELECT t1.req_id, t1.ad_id, t1.ts, t1.json_info, IF(t2.req_id is null, 0, 1) AS is_click FROM (select ts,event_ts,req_id,ad_id,value as json_info from ed lateral view json_tuple(value,'req_id','ad_id') tt as req_id,ad_id) t1 LEFT JOIN click t2 ON t1.req_id = t2.req_id AND t1.ad_id = t2.ad_id AND t2.event_ts BETWEEN t1.event_ts - INTERVAL 10 MINUTE AND t1.event_ts + INTERVAL 4 MINUTE ) tmp;標注:Spark批流一體引擎在流語法上盡量與Flink對齊,同時我們實現了python/java/scala多語言udf的動態注冊以方便用戶使用
3. 新方案收益
通過鏈路架構升級,基于Flink/Spark + Hudi的新的流批一體架構帶來了如下收益
?構建在Hudi上的批流統一架構純SQL化極大的加速了用戶的開發效率?Hudi在COW以及MOR不同場景的優化讓用戶有了更多的讀取方式選擇,增量查詢讓算法可以實現分鐘級別的模型更新,這也是用戶的強烈訴求?利用SS以及Flink的事件時間語義抹平了口徑上的Gap?Hudi自動Compact機制+小文件智能處理,對比第一版實現甚至對比需要手動Compact無疑極大的減輕了工程負擔
4. 踩過的坑
?寫Hudi重試失敗導致數據丟失風險。解決辦法:hoodie.datasource.write.streaming.ignore.failed.batch設置為false,不然Task會間隔hoodie.datasource.write.streaming.retry.interval.ms(默認2000)重試hoodie.datasource.write.streaming.retry.count(默認3)?增量查詢Range太大,導致算法任務重試1小時之前的數據獲取到空數據。解決辦法:調大保留版本數對應參數為hoodie.keep.min.commits、hoodie.keep.max.commits調大cleanup retention版本數對應參數為hoodie.cleaner.commits.retained?Upsert模式下數據丟失問題。解決辦法:hoodie.datasource.write.insert.drop.duplicates設置為false,這個參數會將已經存在index的record丟棄,如果存在update的record會被丟棄?Spark讀取hudi可能會存在path not exists的問題,這個是由于cleanup導致的,解決辦法:調整文件版本并進行重試讀取
總結
以上是生活随笔為你收集整理的基于 Apache Hudi 构建流批一体系统实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习从入门到精通50讲(九)-基于
- 下一篇: 产品经验谈:推荐系统实战案例-如何寻找有