日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Iceberg 在基于 Flink 的流式数据入库场景中的应用

發布時間:2024/8/23 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Iceberg 在基于 Flink 的流式数据入库场景中的应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文以流式數據入庫的場景為基礎,介紹引入 Iceberg 作為落地格式和嵌入 Flink sink 的收益,并分析了當前可實現的框架及要點。

應用場景

流式數據入庫,是大數據和數據湖的典型應用場景。上游的流式數據,如日志,或增量修改,通過數據總線,經過必要的處理后,匯聚并存儲于數據湖,供下游的應用(如報表或者商業智能分析)使用。

上述的應用場景通常有如下的痛點,需要整個流程不斷的優化:

  • 支持流式數據寫入,并保證端到端的不重不丟(即 exactly-once);
  • 盡量減少中間環節,能支持更實時(甚至是 T+0)的讀取或導出,給下游提供更實時更準確的基礎數據;
  • 支持 ACID,避免臟讀等錯誤發生;
  • 支持修改已落地的數據,雖然大數據和數據湖長于處理靜態的或者緩慢變化的數據,即讀多寫少的場景,但方便的修改功能可以提升用戶體驗,避免用戶因為極少的修改,手動更換整個數據文件,甚至是重新導出;
  • 支持修改表結構,如增加或者變更列;而且變更不要引起數據的重新組織。

引入 Iceberg 作為 Flink sink

為了解決上述痛點,我們引入了 Iceberg 作為數據落地的格式。Iceberg 支持 ACID 事務、修改和刪除、獨立于計算引擎、支持表結構和分區方式動態變更等特性,很好的滿足我們的需求。

同時,為了支持流式數據的寫入,我們引入 Flink 作為流式處理框架,并將 Iceberg 作為 Flink sink。

下文主要介紹 Flink Iceberg sink 的實現框架和要點。但在這之前,需要先介紹一些實現中用到的 Flink 基本概念。

Flink 基本概念

從 Flink 的角度如何理解"流"和"批"

Flink 使用 DataFrame API 來統一的處理流和批數據。

Stream, Transformation 和 Operator

一個 Flink 程序由 stream 和 transformation 組成:

  • Stream: Transformation 之間的中間結果數據;
  • Transformation:對(一個或多個)輸入 stream 進行操作,輸出(一個或多個)結果 stream。

當 Flink 程序執行時,其被映射成 Streaming Dataflow,由如下的部分組成:

  • Source (operator):接收外部輸入給 Flink;
  • Transformation (operator):中間對 stream 做的任何操作;
  • Sink (operator):Flink 輸出給外部。

下圖為 Flink 官網的示例,展示了一個以 Kafka 作為輸入 Source,經過中間兩個 transformation,最終通過 sink 輸出到 Flink 之外的過程。

State, Checkpoint and Snapshot

Flink 依靠 checkpoint 和基于 snapshot 的恢復機制,保證程序 state 的一致性,實現容錯。

Checkpoint 是對分布式的數據流,以及所有 operator 的 state,打 snapshot 的過程。

■ State

一個 operator 的 state,即它包含的所有用于恢復當前狀態的信息,可分為兩類:

  • 系統 state:如 operator 中對數據的緩存。
  • 用戶自定義 state:和用戶邏輯相關,可以利用 Flink 提供的 managed state,如 ValueState、ListState,來存儲。

State 的存儲位置,可以分為:

  • Local:內存,或者本地磁盤
  • State backend:遠端的持久化存儲,如 HDFS。

如下圖所示:

■ Checkpoint

Flink 做 checkpoint 的過程如下:

  • Checkpoint coordinator 首先發送 barrier 給 source。
  • Source 做 snapshot,完成后向 coordinator 確認。
  • Source 向下游發送 barrier。
  • 下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 確認。
  • 繼續往下游發送 barrier,直到 sink。
  • Sink 通知 coordinator 自己完成 checkpoint。
  • Coordinator 確認本周期 snapshot 做完。
  • 如下圖所示:

    ■ Barrier

    Barrier 是 Flink 做分布式 snapshot 的重要概念。它作為一個系統標記,被插入到數據流中,隨真實數據一起,按照數據流的方向,從上游向下游傳遞。

    由于每個 barrier 唯一對應 checkpoint id,所以數據流中的 record 實際被 barrier 分組,如下圖所示,barrier n 和 barrier n-1 之間的 record,屬于 checkpoint n。

    Barrier 的作用是在分布式的數據流中,將 operator 的多個輸入流按照 checkpoint對齊(align),如下圖所示:

    Flink Iceberg sink

    了解了上述 Flink 的基本概念,這些概念又是如何被應用和映射到 Flink Iceberg sink 當中的呢?

    總體框架

    如圖,Flink Iceberg sink 有兩個主要模塊和兩個輔助模塊組成:

    實現要點

    ■ Writer

  • 在當前的實現中,Java 的 Map 作為每條記錄,輸入給 writer。內部邏輯先將其轉化為作為中間格式的 Avro IndexedRecord,而后通過 Iceberg 里的 Parquet 相關 API,累積的寫入 DataFile。
  • 使用 Avro 作為中間格式是一個臨時方案,為簡化適配,并最大限度的利用現有邏輯。但長期來看,使用中間格式會影響處理效率,社區也在試圖通過 ISSUE-870 來去掉 Avro,進而使用 Iceberg 內建的數據類型作為輸入,同時也需要加入一個到 Flink 內建數據類型的轉換器。
  • 在做 checkpoint 的過程中,發送 writer 自己的 barrier 到下游的 committer 之前,關閉單個 Parquet 文件,構建 DataFile,并發送 DataFile 的信息給下游。
  • ■ Committer

  • 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,將收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作為用戶自定義的 state,保存于 snapshot 中。
  • 當 checkpoint 完成以后,通過 merge append 將 manifest file 提交給 Iceberg。Iceberg 內部通過后續的一系列操作完成 commit。最終讓新加入的數據對其他的讀任務可見。
  • 試用 Flink Iceberg sink

    社區上?https://github.com/apache/incubator-iceberg/pull/856?提供了可以試用的原型代碼。下載該 patch 放入 master 分支,編譯并構建即可。如下的程序展示了如何將該 sink 嵌入到 Flink 數據流中:

    // Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration(); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table Schema schema = new Schema(... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing env.enableCheckpointing(...);// Add Source DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink Configuration conf = new Configuration(); conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1); appender.append(dataStream);// Trigger the execution env.execute("Sink Test");

    后續規劃

    Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作為中間格式;以及在各種失敗的情況下是否仍能保證端到端的 exactly-once;按固定時長做 checkpoint,在高低峰時生成不同大小的 DataFile,是否對后續讀不友好等。這些問題都在我們的后續規劃中,也會全數貢獻給社區。

    參考資料:

    [1] Iceberg 官網:
    https://iceberg.apache.org/
    [2] Flink 1.10文 檔:
    https://ci.apache.org/projects/flink/flink-docs-release-1.10/
    [3] Neflix 提供的 Flink Iceberg connector 原型:
    https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
    [4] Flink Iceberg sink 設計文檔:
    https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
    [5] Flink 容錯機制(checkpoint) :
    https://www.cnblogs.com/starzy/p/11439988.html

    ?

    ?

    # 社區活動推薦 #

    普惠全球開發者,這一次,格外與眾不同!首個 Apache 頂級項目在線會議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規劃一覽無余。點擊下方鏈接可了解更多大會詳情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx

    原文鏈接
    本文為云棲社區原創內容,未經允許不得轉載。

    總結

    以上是生活随笔為你收集整理的Iceberg 在基于 Flink 的流式数据入库场景中的应用的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 3d动漫精品啪啪一区二区下载 | 中国黄色在线视频 | 涩色视频| 久草超碰在线 | 亚洲精品久久久久久无码色欲四季 | 久久精品夜色噜噜亚洲a∨ 中文字幕av网 | 日韩黄色网| 黄色亚洲网站 | 91捆绑91紧缚调教91 | 美女被草网站 | 国产精品久久久久无码av色戒 | 18成人在线 | 亚洲精品自拍 | 国产精品情侣 | 伊人久在线 | 521a人成v香蕉网站 | 精品视频一区二区在线观看 | 亚洲丁香花色 | 一区二区国产电影 | 久久久久国产精品人妻 | 男男大尺度 | 日韩三级在线观看 | 日韩一级二级三级 | 亚洲一区网站 | 亚洲一区二区免费视频 | 国产伦理av | 美女隐私免费看 | 久久久综合精品 | av中字在线| 黄色一级片毛片 | 日本三级中国三级99人妇网站 | 国产精品嫩草影院桃色 | 国产一卡二卡在线播放 | 欧美自拍亚洲 | 成人污视频 | 日韩国产欧美一区二区三区 | 超碰97在线资源站 | 骚狐网站| 91网站在线免费观看 | 污视频软件在线观看 | 乡村性满足hd | 国产精品一区二区在线播放 | 91丨porny丨尤物 | 成人a级大片 | 免费精品视频在线 | 超碰在线网站 | 一本色道久久综合狠狠躁的推荐 | 91久久一区二区三区 | 日韩激情小说 | 亚洲一区在线观 | 日本a视频在线观看 | 久久夜色精品国产欧美乱极品 | 国产精品久久久av | 成人污污视频在线观看 | 天天干天天草 | 久久久久亚洲av成人片 | 欧美国产三级 | 伊人久操 | 日本做爰高潮又黄又爽 | 国产一级片精品 | av在线资源站 | 97免费在线观看 | 国产中文欧美日韩在线 | 日韩成人在线免费视频 | 538在线精品 | 国产伦精品一区二区三区四区免费 | 国产青青青| 国产在线观看无码免费视频 | 欧美福利一区二区 | 成年人在线观看网站 | 欧美一级一区二区三区 | 女十八毛片 | av五月天在线 | 草莓视频在线观看入口w | 阿v免费视频 | 91黄色影视 | 国产毛片电影 | 国产亚洲精品久久久久婷婷瑜伽 | 免费中文字幕日韩欧美 | av激情久久 | 中文字幕一区二区三区四区视频 | 真实偷拍激情啪啪对白 | 又色又爽又高潮免费视频国产 | 日日碰狠狠添天天爽无码av | 国产一区二区三区小说 | 黄色香蕉视频 | 能看的av网站 | 精品人妻人人做人人爽 | 欧美三级黄色大片 | 精品一级少妇久久久久久久 | melody在线高清免费观看 | 亚洲精品嫩草 | 九九九九精品九九九九 | 久久久久久久久久久久久女过产乱 | 涩涩视频在线播放 | 粉嫩av一区二区 | 少妇性l交大片7724com | 嫩草www | 欧美精品一区在线观看 |