Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式數據入庫的場景為基礎,介紹引入 Iceberg 作為落地格式和嵌入 Flink sink 的收益,并分析了當前可實現的框架及要點。
應用場景
流式數據入庫,是大數據和數據湖的典型應用場景。上游的流式數據,如日志,或增量修改,通過數據總線,經過必要的處理后,匯聚并存儲于數據湖,供下游的應用(如報表或者商業智能分析)使用。
上述的應用場景通常有如下的痛點,需要整個流程不斷的優化:
- 支持流式數據寫入,并保證端到端的不重不丟(即 exactly-once);
- 盡量減少中間環節,能支持更實時(甚至是 T+0)的讀取或導出,給下游提供更實時更準確的基礎數據;
- 支持 ACID,避免臟讀等錯誤發生;
- 支持修改已落地的數據,雖然大數據和數據湖長于處理靜態的或者緩慢變化的數據,即讀多寫少的場景,但方便的修改功能可以提升用戶體驗,避免用戶因為極少的修改,手動更換整個數據文件,甚至是重新導出;
- 支持修改表結構,如增加或者變更列;而且變更不要引起數據的重新組織。
引入 Iceberg 作為 Flink sink
為了解決上述痛點,我們引入了 Iceberg 作為數據落地的格式。Iceberg 支持 ACID 事務、修改和刪除、獨立于計算引擎、支持表結構和分區方式動態變更等特性,很好的滿足我們的需求。
同時,為了支持流式數據的寫入,我們引入 Flink 作為流式處理框架,并將 Iceberg 作為 Flink sink。
下文主要介紹 Flink Iceberg sink 的實現框架和要點。但在這之前,需要先介紹一些實現中用到的 Flink 基本概念。
Flink 基本概念
從 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 來統一的處理流和批數據。
Stream, Transformation 和 Operator
一個 Flink 程序由 stream 和 transformation 組成:
- Stream: Transformation 之間的中間結果數據;
- Transformation:對(一個或多個)輸入 stream 進行操作,輸出(一個或多個)結果 stream。
當 Flink 程序執行時,其被映射成 Streaming Dataflow,由如下的部分組成:
- Source (operator):接收外部輸入給 Flink;
- Transformation (operator):中間對 stream 做的任何操作;
- Sink (operator):Flink 輸出給外部。
下圖為 Flink 官網的示例,展示了一個以 Kafka 作為輸入 Source,經過中間兩個 transformation,最終通過 sink 輸出到 Flink 之外的過程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基于 snapshot 的恢復機制,保證程序 state 的一致性,實現容錯。
Checkpoint 是對分布式的數據流,以及所有 operator 的 state,打 snapshot 的過程。
■ State
一個 operator 的 state,即它包含的所有用于恢復當前狀態的信息,可分為兩類:
- 系統 state:如 operator 中對數據的緩存。
- 用戶自定義 state:和用戶邏輯相關,可以利用 Flink 提供的 managed state,如 ValueState、ListState,來存儲。
State 的存儲位置,可以分為:
- Local:內存,或者本地磁盤
- State backend:遠端的持久化存儲,如 HDFS。
如下圖所示:
■ Checkpoint
Flink 做 checkpoint 的過程如下:
如下圖所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作為一個系統標記,被插入到數據流中,隨真實數據一起,按照數據流的方向,從上游向下游傳遞。
由于每個 barrier 唯一對應 checkpoint id,所以數據流中的 record 實際被 barrier 分組,如下圖所示,barrier n 和 barrier n-1 之間的 record,屬于 checkpoint n。
Barrier 的作用是在分布式的數據流中,將 operator 的多個輸入流按照 checkpoint對齊(align),如下圖所示:
Flink Iceberg sink
了解了上述 Flink 的基本概念,這些概念又是如何被應用和映射到 Flink Iceberg sink 當中的呢?
總體框架
如圖,Flink Iceberg sink 有兩個主要模塊和兩個輔助模塊組成:
實現要點
■ Writer
■ Committer
試用 Flink Iceberg sink
社區上?https://github.com/apache/incubator-iceberg/pull/856?提供了可以試用的原型代碼。下載該 patch 放入 master 分支,編譯并構建即可。如下的程序展示了如何將該 sink 嵌入到 Flink 數據流中:
// Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration(); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table Schema schema = new Schema(... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing env.enableCheckpointing(...);// Add Source DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink Configuration conf = new Configuration(); conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1); appender.append(dataStream);// Trigger the execution env.execute("Sink Test");后續規劃
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作為中間格式;以及在各種失敗的情況下是否仍能保證端到端的 exactly-once;按固定時長做 checkpoint,在高低峰時生成不同大小的 DataFile,是否對后續讀不友好等。這些問題都在我們的后續規劃中,也會全數貢獻給社區。
參考資料:
[1] Iceberg 官網:
https://iceberg.apache.org/
[2] Flink 1.10文 檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 設計文檔:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容錯機制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
?
?
# 社區活動推薦 #
普惠全球開發者,這一次,格外與眾不同!首個 Apache 頂級項目在線會議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規劃一覽無余。點擊下方鏈接可了解更多大會詳情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Iceberg 在基于 Flink 的流式数据入库场景中的应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云峰会|数据库也能自动驾驶?DAS全
- 下一篇: 菜鸟网络宣布推出物流加速上云行动“鲲鹏计