日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > windows >内容正文

windows

聊聊流式数据湖Paimon(四)

發(fā)布時(shí)間:2023/12/29 windows 31 coder
生活随笔 收集整理的這篇文章主要介紹了 聊聊流式数据湖Paimon(四) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Partial Update

--FlinkSQL參數(shù)設(shè)置 set `table.dynamic-table-options.enabled` = `true`; SET `env.state.backend` = `rocksdb`; SET `execution.checkpointing.interval` = `60000`; SET `execution.checkpointing.tolerable-failed-checkpoints` = `3`; SET `execution.checkpointing.min-pause` = `60000`; --創(chuàng)建Paimon catalog CREATE CATALOG paimon WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse' = 'hdfs://paimon', 'table.type' = 'EXTERNAL' ); --創(chuàng)建Partial update結(jié)果表 CREATE TABLE if not EXISTS paimon.dw.order_detail ( `order_id` string, `product_type` string, `plat_name` string, `ref_id` bigint, `start_city_name` string, `end_city_name` string, `create_time` timestamp(3), `update_time` timestamp(3), `dispatch_time` timestamp(3), `decision_time` timestamp(3), `finish_time` timestamp(3), `order_status` int, `binlog_time` bigint, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'bucket' = '20', -- 指定20個(gè)bucket 'bucket-key' = 'order_id', -- 記錄排序字段 'sequence.field' = 'binlog_time', -- 選擇 full-compaction ,在compaction后產(chǎn)生完整的changelog 'changelog-producer' = 'full-compaction', -- compaction 間隔時(shí)間 'changelog-producer.compaction-interval' = '2 min', 'merge-engine' = 'partial-update', -- 忽略DELETE數(shù)據(jù),避免運(yùn)行報(bào)錯(cuò) 'partial-update.ignore-delete' = 'true' ); INSERT INTO paimon.dw.order_detail -- order_info表提供主要字段 SELECT order_id, product_type, plat_name, ref_id, cast(null as string) as start_city_name, cast(null as string) as end_city_name, create_time, update_time, dispatch_time, decision_time, finish_time, order_status, binlog_time FROM paimon.ods.order_info /*+ OPTIONS ('scan.mode'='latest') */ union all -- order_address表提供城市字段 SELECT order_id, cast(null as string) as product_type, cast(null as string) as plat_name, cast(null as bigint) as ref_id, start_city_name, end_city_name, cast(null as timestamp(3)) as create_time, cast(null as timestamp(3)) as update_time, cast(null as timestamp(3)) as dispatch_time, cast(null as timestamp(3)) as decision_time, cast(null as timestamp(3)) as finish_time, cast(null as int) as order_status, binlog_time FROM paimon.ods.order_address /*+ OPTIONS ('scan.mode'='latest') */ ;

完整的Changlog

Paimon中的表被多流填充數(shù)據(jù)且打?qū)捑S度后,支持流讀、批讀的方式提供完整的Changelog給下游。

Sequence-Group

配置:'fields.G.sequence-group'='A,B'
由字段G控制是否更新字段A, B;總得來說,G的值如果為null或比更新值大將不更新A,B;如下單測

public void testSequenceGroup() {
    sql(
            "CREATE TABLE SG ("
                    + "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
                    + " WITH ("
                    + "'merge-engine'='partial-update', "
                    + "'fields.g_1.sequence-group'='a,b', "
                    + "'fields.g_2.sequence-group'='c,d');");

    sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)");

    // g_2 should not be updated
    sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))");

    // select *
    assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1));

    // projection
    assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(1, 1));

    // g_1 should not be updated
    sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)");

    assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3));

    // d should be updated by null
    sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))");
    sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))");
    sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)");

    assertThat(sql("SELECT a, b FROM SG")).containsExactlyInAnyOrder(Row.of(4, 4));
    assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(5, null));
}

其作用是:

  1. 在多個(gè)數(shù)據(jù)流更新期間的無序問題。每個(gè)數(shù)據(jù)流都定義自己的序列組。
  2. 真正的部分更新,而不僅僅是非空值的更新。
  3. 接受刪除記錄來撤銷部分列。

Changelog-Producer

Paimon通過Changelog-Producer支持生成changelog,并支持下游以流讀、批讀的形式讀取changelog。
Changelog的生成有多種方式,input、lookup、full-compaction;其生成代價(jià)是由低到高。

None

不查找舊值,不額外寫Changelog;但會(huì)下游任務(wù)中通過ChangelogNormalize算子補(bǔ)足Changelog。

Input

不查找舊值,額外寫Changelog;適用與CDC的數(shù)據(jù)源。

Lookup

查找舊值,額外寫Changelog;如果不是CDC數(shù)據(jù)源,需要通過LookupCompaction查找舊值,即在 compaction 的過程中, 會(huì)去向高層查找本次新增 key 的舊值, 如果沒有查找到, 那么本次的就是新增 key, 如果有查找到, 那么就生成完整的 UB 和 UA 消息。

Full-Compaction

查找舊值,額外寫Changelog;在 full compact 的過程中, 其實(shí)數(shù)據(jù)都會(huì)被寫到最高層, 所以所有 value 的變化都是可以推演出來的.

數(shù)據(jù)一致性

數(shù)據(jù)版本

通過Flink的checkpoint機(jī)制,生成Snapshot并標(biāo)記版本,即,一個(gè)Snapshot對(duì)應(yīng)數(shù)據(jù)的一個(gè)版本。
比如 Job-A 基于 Table-A 的 Snapshot-20 產(chǎn)出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20產(chǎn)出了 Table-C 的 Snapshot-15。那么 Job-C 的查詢就應(yīng)該基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 進(jìn)行計(jì)算,明確了數(shù)據(jù)版本,從而實(shí)現(xiàn)計(jì)算的一致性。

生成的snapshot-xx,就是數(shù)據(jù)的版本號(hào)。

數(shù)據(jù)對(duì)齊

將 Checkpoint 插入到兩個(gè) Snapshot 的數(shù)據(jù)之間。如果當(dāng)前的 Snapshot 還沒有完全被消費(fèi),這個(gè) Checkpoint 的觸發(fā)會(huì)被推遲,從而實(shí)現(xiàn)按照 Snapshot 對(duì)數(shù)據(jù)進(jìn)行劃分和對(duì)齊。

實(shí)現(xiàn)分為兩個(gè)部分。

  • 在提交階段,需要去血緣關(guān)系表中查詢上下游表的一致性版本,并且基于查詢結(jié)果給對(duì)應(yīng)的上游表設(shè)置起始的消費(fèi)位置。
  • 在運(yùn)行階段,按照消費(fèi)的 Snapshot 來協(xié)調(diào) Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 發(fā)出 Checkpoint 的請(qǐng)求時(shí),會(huì)強(qiáng)制要求將 Checkpoint 插入到兩個(gè) Snapshot 的數(shù)據(jù)之間。如果當(dāng)前的 Snapshot 還沒有完全被消費(fèi),這個(gè) Checkpoint 的觸發(fā)會(huì)被推遲,從而實(shí)現(xiàn)按照 Snapshot 對(duì)數(shù)據(jù)進(jìn)行劃分和處理。

數(shù)據(jù)血緣

概念

數(shù)據(jù)從產(chǎn)生到消費(fèi)的整個(gè)流轉(zhuǎn)過程中所經(jīng)歷的各種轉(zhuǎn)換、處理和流動(dòng)的軌跡。數(shù)據(jù)血緣提供了數(shù)據(jù)的來源、去向以及中間處理過程的透明度,幫助用戶理解數(shù)據(jù)如何在系統(tǒng)中被處理和移動(dòng),以及數(shù)據(jù)是如何從原始狀態(tài)轉(zhuǎn)化為最終的可消費(fèi)形態(tài)。

實(shí)現(xiàn)

在checkpoint的提交時(shí)將數(shù)據(jù)的血緣關(guān)系寫入到System Table,記錄血緣關(guān)系。

總結(jié)

以上是生活随笔為你收集整理的聊聊流式数据湖Paimon(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。