聊聊流式数据湖Paimon(四)
Partial Update
--FlinkSQL參數(shù)設(shè)置
set
`table.dynamic-table-options.enabled` = `true`;
SET
`env.state.backend` = `rocksdb`;
SET
`execution.checkpointing.interval` = `60000`;
SET
`execution.checkpointing.tolerable-failed-checkpoints` = `3`;
SET
`execution.checkpointing.min-pause` = `60000`;
--創(chuàng)建Paimon catalog
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://localhost:9083',
'warehouse' = 'hdfs://paimon',
'table.type' = 'EXTERNAL'
);
--創(chuàng)建Partial update結(jié)果表
CREATE TABLE if not EXISTS paimon.dw.order_detail (
`order_id` string,
`product_type` string,
`plat_name` string,
`ref_id` bigint,
`start_city_name` string,
`end_city_name` string,
`create_time` timestamp(3),
`update_time` timestamp(3),
`dispatch_time` timestamp(3),
`decision_time` timestamp(3),
`finish_time` timestamp(3),
`order_status` int,
`binlog_time` bigint,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket' = '20',
-- 指定20個(gè)bucket
'bucket-key' = 'order_id',
-- 記錄排序字段
'sequence.field' = 'binlog_time',
-- 選擇 full-compaction ,在compaction后產(chǎn)生完整的changelog
'changelog-producer' = 'full-compaction',
-- compaction 間隔時(shí)間
'changelog-producer.compaction-interval' = '2 min',
'merge-engine' = 'partial-update',
-- 忽略DELETE數(shù)據(jù),避免運(yùn)行報(bào)錯(cuò)
'partial-update.ignore-delete' = 'true'
);
INSERT INTO
paimon.dw.order_detail
-- order_info表提供主要字段
SELECT
order_id,
product_type,
plat_name,
ref_id,
cast(null as string) as start_city_name,
cast(null as string) as end_city_name,
create_time,
update_time,
dispatch_time,
decision_time,
finish_time,
order_status,
binlog_time
FROM
paimon.ods.order_info
/*+ OPTIONS ('scan.mode'='latest') */
union
all
-- order_address表提供城市字段
SELECT
order_id,
cast(null as string) as product_type,
cast(null as string) as plat_name,
cast(null as bigint) as ref_id,
start_city_name,
end_city_name,
cast(null as timestamp(3)) as create_time,
cast(null as timestamp(3)) as update_time,
cast(null as timestamp(3)) as dispatch_time,
cast(null as timestamp(3)) as decision_time,
cast(null as timestamp(3)) as finish_time,
cast(null as int) as order_status,
binlog_time
FROM
paimon.ods.order_address
/*+ OPTIONS ('scan.mode'='latest') */
;
完整的Changlog
Paimon中的表被多流填充數(shù)據(jù)且打?qū)捑S度后,支持流讀、批讀的方式提供完整的Changelog給下游。
Sequence-Group
配置:'fields.G.sequence-group'='A,B'
由字段G控制是否更新字段A, B;總得來說,G的值如果為null或比更新值大將不更新A,B;如下單測
public void testSequenceGroup() {
sql(
"CREATE TABLE SG ("
+ "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+ " WITH ("
+ "'merge-engine'='partial-update', "
+ "'fields.g_1.sequence-group'='a,b', "
+ "'fields.g_2.sequence-group'='c,d');");
sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)");
// g_2 should not be updated
sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))");
// select *
assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1));
// projection
assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(1, 1));
// g_1 should not be updated
sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)");
assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3));
// d should be updated by null
sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))");
sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))");
sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)");
assertThat(sql("SELECT a, b FROM SG")).containsExactlyInAnyOrder(Row.of(4, 4));
assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(5, null));
}
其作用是:
- 在多個(gè)數(shù)據(jù)流更新期間的無序問題。每個(gè)數(shù)據(jù)流都定義自己的序列組。
- 真正的部分更新,而不僅僅是非空值的更新。
- 接受刪除記錄來撤銷部分列。
Changelog-Producer
Paimon通過Changelog-Producer支持生成changelog,并支持下游以流讀、批讀的形式讀取changelog。
Changelog的生成有多種方式,input、lookup、full-compaction;其生成代價(jià)是由低到高。
None
不查找舊值,不額外寫Changelog;但會(huì)下游任務(wù)中通過ChangelogNormalize算子補(bǔ)足Changelog。
Input
不查找舊值,額外寫Changelog;適用與CDC的數(shù)據(jù)源。
Lookup
查找舊值,額外寫Changelog;如果不是CDC數(shù)據(jù)源,需要通過LookupCompaction查找舊值,即在 compaction 的過程中, 會(huì)去向高層查找本次新增 key 的舊值, 如果沒有查找到, 那么本次的就是新增 key, 如果有查找到, 那么就生成完整的 UB 和 UA 消息。
Full-Compaction
查找舊值,額外寫Changelog;在 full compact 的過程中, 其實(shí)數(shù)據(jù)都會(huì)被寫到最高層, 所以所有 value 的變化都是可以推演出來的.
數(shù)據(jù)一致性
數(shù)據(jù)版本
通過Flink的checkpoint機(jī)制,生成Snapshot并標(biāo)記版本,即,一個(gè)Snapshot對(duì)應(yīng)數(shù)據(jù)的一個(gè)版本。
比如 Job-A 基于 Table-A 的 Snapshot-20 產(chǎn)出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20產(chǎn)出了 Table-C 的 Snapshot-15。那么 Job-C 的查詢就應(yīng)該基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 進(jìn)行計(jì)算,明確了數(shù)據(jù)版本,從而實(shí)現(xiàn)計(jì)算的一致性。
生成的snapshot-xx,就是數(shù)據(jù)的版本號(hào)。
數(shù)據(jù)對(duì)齊
將 Checkpoint 插入到兩個(gè) Snapshot 的數(shù)據(jù)之間。如果當(dāng)前的 Snapshot 還沒有完全被消費(fèi),這個(gè) Checkpoint 的觸發(fā)會(huì)被推遲,從而實(shí)現(xiàn)按照 Snapshot 對(duì)數(shù)據(jù)進(jìn)行劃分和對(duì)齊。
實(shí)現(xiàn)分為兩個(gè)部分。
- 在提交階段,需要去血緣關(guān)系表中查詢上下游表的一致性版本,并且基于查詢結(jié)果給對(duì)應(yīng)的上游表設(shè)置起始的消費(fèi)位置。
- 在運(yùn)行階段,按照消費(fèi)的 Snapshot 來協(xié)調(diào) Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 發(fā)出 Checkpoint 的請(qǐng)求時(shí),會(huì)強(qiáng)制要求將 Checkpoint 插入到兩個(gè) Snapshot 的數(shù)據(jù)之間。如果當(dāng)前的 Snapshot 還沒有完全被消費(fèi),這個(gè) Checkpoint 的觸發(fā)會(huì)被推遲,從而實(shí)現(xiàn)按照 Snapshot 對(duì)數(shù)據(jù)進(jìn)行劃分和處理。
數(shù)據(jù)血緣
概念
數(shù)據(jù)從產(chǎn)生到消費(fèi)的整個(gè)流轉(zhuǎn)過程中所經(jīng)歷的各種轉(zhuǎn)換、處理和流動(dòng)的軌跡。數(shù)據(jù)血緣提供了數(shù)據(jù)的來源、去向以及中間處理過程的透明度,幫助用戶理解數(shù)據(jù)如何在系統(tǒng)中被處理和移動(dòng),以及數(shù)據(jù)是如何從原始狀態(tài)轉(zhuǎn)化為最終的可消費(fèi)形態(tài)。
實(shí)現(xiàn)
在checkpoint的提交時(shí)將數(shù)據(jù)的血緣關(guān)系寫入到System Table,記錄血緣關(guān)系。
總結(jié)
以上是生活随笔為你收集整理的聊聊流式数据湖Paimon(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《原神》千步拦射角分间第三关通关攻略
- 下一篇: 为什么要实践 A+ES & CQRS ?