日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

聊聊流式数据湖Paimon(一)

發布時間:2023/12/29 windows 36 coder
生活随笔 收集整理的這篇文章主要介紹了 聊聊流式数据湖Paimon(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

翻譯自 Apache Paimon官方文檔

概覽

概述

Apache Paimon (incubating) 是一項流式數據湖存儲技術,可以為用戶提供高吞吐、低延遲的數據攝入、流式訂閱以及實時查詢能力。

簡單來說,Paimon的上游是各個CDC,即changlog數據流;而其自身支持實時sink與search(下沉與查詢)changlog數據流。一般會與Flink等流式計算引擎集成使用。

流式數據湖是一種先進的數據存儲架構,專門為處理大規模實時數據流而設計。在流式數據湖中,數據以流的形式持續不斷地進入系統,而不是批量存儲后處理。

數據湖是一個存儲企業的各種各樣原始數據的大型倉庫,其中的數據可供存取、處理、分析及傳輸。

數據倉庫中的數據是經過優化后(也可以看作是結構化的數據),且與該數據倉庫支持的數據模型吻合的數據。

Paimon提供以下核心功能:

  • 統一批處理和流式處理:Paimon支持批量寫入和批量讀取,以及流式寫入更改和流式讀取表change log。
  • 數據湖:Paimon作為數據湖存儲,具有成本低、可靠性高、元數據可擴展等優點。
  • Merge Engines:Paimon支持豐富的合并引擎(Merge Engines)。默認情況下,保留主鍵的最后一個條目。您還可以使用“部分更新”或“聚合”引擎。
  • Changelog Producer:用于在數據湖中生成和跟蹤數據的變更日志(changelog);Paimon 支持豐富的 Changelog Producer,例如“lookup”和“full-compaction”;正確的changelog可以簡化流式處理管道的構造。
  • Append Only Tables:Paimon支持只追加(append only)表,自動壓縮小文件,提供有序的流式讀取。您可以使用它來替換消息隊列。

架構


架構如下所示
讀/寫:Paimon 支持多種讀/寫數據和執行 OLAP 查詢的方式。

  • 對于讀取,支持如下三種方式消費數據
    • 歷史快照(批處理模式)
    • 最新的偏移量(流模式)
    • 混合模式下讀取增量快照
  • 對于寫入,它支持來自數據庫變更日志(CDC)的流式同步或來自離線數據的批量插入/覆蓋。

生態系統:除了Apache Flink之外,Paimon還支持Apache Hive、Apache Spark、Trino等其他計算引擎的讀取。
底層存儲:Paimon 將列式文件存儲在文件系統/對象存儲上,并使用 LSM 樹結構來支持大量數據更新和高性能查詢。

統一存儲

對于 Apache Flink 這樣的流引擎,通常有三種類型的connector:

  • 消息隊列,例如 Apache Kafka,在該消息管道(pipeline)的源階段和中間階段使用,以保證延遲保持在秒級。
  • OLAP系統,例如ClickHouse,它以流方式接收處理后的數據并服務用戶的即席查詢。
  • 批量存儲,例如Apache Hive,它支持傳統批處理的各種操作,包括INSERT OVERWRITE。

Paimon 提供抽象概念的表。 它的使用方式與傳統數據庫沒有什么區別:

  • 在批處理執行模式下,它就像一個Hive表,支持Batch SQL的各種操作。 查詢它以查看最新的快照。
  • 在流執行模式下,它的作用就像一個消息隊列。 查詢它的行為就像從歷史數據永不過期的消息隊列中查詢stream changelog。

基本概念

Snapshot

snapshot捕獲table在某個時間點的狀態。 用戶可以通過最新的snapshot來訪問表的最新數據。根據時間,用戶還可以通過較早的快照訪問表的先前狀態。

Partition

Paimon 采用與 Apache Hive 相同的分區概念來分離數據。
分區是一種可選方法,可根據date, city, and department等特定列的值將表劃分為相關部分。每個表可以有一個或多個分區鍵來標識特定分區。
通過分區,用戶可以高效地操作表中的一片記錄。

Bucket

未分區表或分區表中的分區被細分為Bucket(桶),以便為可用于更有效查詢的數據提供額外的結構。
Bucket的范圍由record中的一列或多列的哈希值確定。用戶可以通過提供bucket-key選項來指定分桶列。如果未指定bucket-key選項,則主鍵(如果已定義)或完整記錄將用作存儲桶鍵。
Bucket是讀寫的最小存儲單元,因此Bucket的數量限制了最大處理并行度。 不過這個數字不應該太大,因為它會導致大量 小文件和低讀取性能。 一般來說,每個桶中建議的數據大小約為200MB - 1GB。

Consistency Guarantees

Paimon Writer 使用兩階段提交協議以原子方式將一批record提交到Table中。每次提交時最多生成兩個snapshot。
對于任意兩個同時修改table的寫入者,只要他們不修改同一個Bucket,他們的提交就可以并行發生。如果他們修改同一個Bucket,則僅保證快照隔離。也就是說,最終表狀態可能是兩次提交的混合,但不會丟失任何更改。

文件

概述

一張表的所有文件都存儲在一個基本目錄下。 Paimon 文件以分層方式組織。 下圖說明了文件布局。 從snapshot文件開始,Paimon reader可以遞歸地訪問表中的所有記錄。

Snapshot Files

所有snapshot文件都存儲在snapshot目錄中。
snapshot文件是一個 JSON 文件,包含有關此snapshot的信息,包括

  • 正在使用的Schema文件
  • 包含此snapshot的所有更改的清單列表(manifest list)

Manifest Files

所有清單(manifest)列表和清單文件都存儲在清單目錄中。
清單列表(manifest list)是清單文件名的列表。
清單文件是包含有關 LSM 數據文件和changelog文件的更改的文件。 例如對應快照中創建了哪個LSM數據文件、刪除了哪個文件。

Data Files

數據文件按分區和桶(Bucket)分組。每個Bucket目錄都包含一個 LSM 樹及其changelog文件。
目前,Paimon 支持使用 orc(默認)、parquet 和 avro 作為數據文件格式。

LSM-Trees

Paimon 采用 LSM 樹(日志結構合并樹)作為文件存儲的數據結構。 如下簡要介紹

Sorted Runs

LSM 樹將文件組織成多個 sorted runs。 sorted runs由一個或多個數據文件組成,并且每個數據文件恰好屬于一個 sorted runs。
數據文件中的記錄按其主鍵排序。 在 sorted runs中,數據文件的主鍵范圍永遠不會重疊。

如圖所示的,不同的 sorted runs可能具有重疊的主鍵范圍,甚至可能包含相同的主鍵。查詢LSM樹時,必須合并所有 sorted runs,并且必須根據用戶指定的合并引擎和每條記錄的時間戳來合并具有相同主鍵的所有記錄。
寫入LSM樹的新記錄將首先緩存在內存中。當內存緩沖區滿時,內存中的所有記錄將被順序并刷新到磁盤,并創建一個新的 sorted runs。

Compaction

當越來越多的記錄寫入LSM樹時,sorted runs的數量將會增加。由于查詢LSM樹需要將所有 sorted runs合并起來,太多 sorted runs將導致查詢性能較差,甚至內存不足。
為了限制 sorted runs的數量,我們必須偶爾將多個 sorted runs合并為一個大的 sorted runs。 這個過程稱為壓縮。
然而,壓縮是一個資源密集型過程,會消耗一定的CPU時間和磁盤IO,因此過于頻繁的壓縮可能會導致寫入速度變慢。 這是查詢和寫入性能之間的權衡。 Paimon 目前采用了類似于 Rocksdb 通用壓縮的壓縮策略。
默認情況下,當Paimon將記錄追加到LSM樹時,它也會根據需要執行壓縮。 用戶還可以選擇在專用壓縮作業中執行所有壓縮。

可以將 sorted runs 理解為多個有序的Data File組成的一個有序文件。

Changelog表是創建表時的默認表類型。用戶可以在表中插入、更新或刪除記錄。
主鍵由一組列組成,這些列包含每個記錄的唯一值。Paimon通過對每個bucket中的主鍵進行排序來實現數據排序,允許用戶通過對主鍵應用過濾條件來實現高性能。
通過在變更日志表上定義主鍵,用戶可以訪問以下特性。

Bucket

桶(Bucket)是進行讀寫操作的最小存儲單元,每個桶目錄包含一個LSM樹。

Fixed Bucket

配置一個大于0的桶,使用Fixed bucket模式,根據Math.abs(key_hashcode % numBuckets)來計算記錄的桶。
重新縮放桶只能通過離線進程進行。桶的數量過多會導致小文件過多,桶的數量過少會導致寫性能不佳。

Dynamic Bucket

配置'Bucket'='-1'。 先到達的key會落入舊的bucket,新的key會落入新的bucket,bucket和key的分布取決于數據到達的順序。 Paimon 維護一個索引來確定哪個鍵對應哪個桶。
Paimon會自動擴大桶的數量。

  • Option1: 'dynamic-bucket.target-row-num':控制一個桶的目標行數。
  • Option2:'dynamic-bucket.initial-buckets':控制初始化bucket的數量。
Normal Dynamic Bucket Mode

當更新不跨分區(沒有分區,或者主鍵包含所有分區字段)時,動態桶模式使用 HASH 索引來維護從鍵到桶的映射,它比固定桶模式需要更多的內存。
如下:

  • 一般來說,沒有性能損失,但會有一些額外的內存消耗,一個分區中的 1 億個條目多占用 1 GB 內存,不再活動的分區不占用內存。
  • 對于更新率較低的表,建議使用此模式,以顯著提高性能。
Cross Partitions Upsert Dynamic Bucket Mode

當需要跨分區upsert(主鍵不包含所有分區字段)時,Dynamic Bucket模式直接維護鍵到分區和桶的映射,使用本地磁盤,并在啟動流寫作業時通過讀取表中所有現有鍵來初始化索引 。 不同的合并引擎有不同的行為:

  1. Deduplicate:刪除舊分區中的數據,并將新數據插入到新分區中。
  2. PartialUpdate & Aggregation:將新數據插入舊分區。
  3. FirstRow:如果有舊值,則忽略新數據。

性能:對于數據量較大的表,性能會有明顯的損失。而且,初始化需要很長時間。
如果你的upsert不依賴太舊的數據,可以考慮配置索引TTL來減少索引和初始化時間:
'cross-partition-upsert.index-ttl':rocksdb索引和初始化中的TTL,這樣可以避免維護太多索引而導致性能越來越差。
但請注意,這也可能會導致數據重復。

Merge Engines

當Paimon sink收到兩條或更多具有相同主鍵的記錄時,它會將它們合并為一條記錄以保持主鍵唯一。 通過指定merge-engine屬性,用戶可以選擇如何將記錄合并在一起。

Deduplicate

deduplicate合并引擎是默認的合并引擎。 Paimon 只會保留最新的記錄,并丟棄其他具有相同主鍵的記錄。
具體來說,如果最新的記錄是DELETE記錄,則所有具有相同主鍵的記錄都將被刪除。

Partial Update

通過指定 'merge-engine' = 'partial-update',用戶可以通過多次更新來更新記錄的列,直到記錄完成。 這是通過使用同一主鍵下的最新數據逐一更新值字段來實現的。 但是,在此過程中不會覆蓋空值。
如下所示:

  • <1, 23.0, 10, NULL>-
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

假設第一列是主鍵key,那么最后的結果是 <1, 25.2, 10, 'This is a book'>

Sequence Group

序列字段并不能解決多流更新的部分更新表的亂序問題,因為多流更新時序列字段可能會被另一個流的最新數據覆蓋。
因此我們引入了部分更新表的序列組(Sequence Group)機制。 它可以解決:

  1. 多流更新時出現混亂。 每個流定義其自己的序列組。
  2. 真正的部分更新,而不僅僅是非空更新。

如下所示:

CREATE TABLE T (
  k INT,
  a INT,
  b INT,
  g_1 INT,
  c INT,
  d INT,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine'='partial-update',
  'fields.g_1.sequence-group'='a,b',
  'fields.g_2.sequence-group'='c,d'
);

INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);

-- g_2 is null, c, d should not be updated
INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));

SELECT * FROM T; -- output 1, 2, 2, 2, 1, 1, 1

-- g_1 is smaller, a, b should not be updated
INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);

SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3

對于 sequence-group,有效的比較數據類型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。

Aggregation

可以為輸入字段指定聚合函數,支持聚合中的所有函數。
如下所示:

CREATE TABLE T (
          k INT,
          a INT,
          b INT,
          c INT,
          d INT,
          PRIMARY KEY (k) NOT ENFORCED
) WITH (
     'merge-engine'='partial-update',
     'fields.a.sequence-group' = 'b',
     'fields.b.aggregate-function' = 'first_value',
     'fields.c.sequence-group' = 'd',
     'fields.d.aggregate-function' = 'sum'
 );
INSERT INTO T VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
INSERT INTO T VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);


SELECT * FROM T; -- output 1, 2, 1, 2, 3
Default Value

如果無法保證數據的順序,僅通過覆蓋空值的方式寫入字段,則讀表時未覆蓋的字段將顯示為空。

CREATE TABLE T (
                  k INT,
                  a INT,
                  b INT,
                  c INT,
                  PRIMARY KEY (k) NOT ENFORCED
) WITH (
     'merge-engine'='partial-update'
     );
INSERT INTO T VALUES (1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1);

SELECT * FROM T; -- output 1, 1, null, 1

如果希望讀表時未被覆蓋的字段有默認值而不是null,則需要fields.name.default-value

CREATE TABLE T (
    k INT,
    a INT,
    b INT,
    c INT,
    PRIMARY KEY (k) NOT ENFORCED
) WITH (
    'merge-engine'='partial-update',
    'fields.b.default-value'='0'
);

INSERT INTO T VALUES (1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
INSERT INTO T VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1);

SELECT * FROM T; -- output 1, 1, 0, 1

Aggregation

有時用戶只關心聚合結果。 聚合 合并引擎根據聚合函數將同一主鍵下的各個值字段與最新數據一一聚合。
每個不屬于主鍵的字段都可以被賦予一個聚合函數,由 fields.<field-name>.aggregate-function 表屬性指定,否則它將使用 last_non_null_value 聚合作為默認值。 例如,請考慮下表定義。

CREATE TABLE MyTable (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT,
    PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'aggregation',
    'fields.price.aggregate-function' = 'max',
    'fields.sales.aggregate-function' = 'sum'
);

price字段將通過 max 函數聚合,sales字段將通過 sum 函數聚合。 給定兩個輸入記錄 <1, 23.0, 15> 和 <1, 30.2, 20>,最終結果將是 <1, 30.2, 35>。
當前支持的聚合函數和數據類型有:

  • sum:支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE。
  • min/max:支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。
  • last_value / last_non_null_value:支持所有數據類型。
  • listagg:支持STRING數據類型。
  • bool_and / bool_or:支持BOOLEAN數據類型。
  • first_value/first_not_null_value:支持所有數據類型。

只有 sum 支持撤回(UPDATE_BEFORE 和 DELETE),其他聚合函數不支持撤回。 如果允許某些函數忽略撤回消息,可以配置:'fields.${field_name}.ignore-retract'='true'

First Row

通過指定 'merge-engine' = 'first-row',用戶可以保留同一主鍵的第一行。 它與Deduplicate合并引擎不同,在First Row合并引擎中,它將生成僅insert changelog。

  1. First Row合并引擎必須與查找變更日志生成器一起使用。
  2. 不能指定sequence.field。
  3. 不接受 DELETE 和 UPDATE_BEFORE 消息。 可以配置 first-row.ignore-delete 來忽略這兩種記錄。

這對于替代流計算中的log deduplication有很大的幫助。

Changelog Producers

流式查詢會不斷產生最新的變化。
通過在創建表時指定更改changelog-producer表屬性,用戶可以選擇從表文件生成的更改模式。

None

默認情況下,不會將額外的changelog producer應用于表的寫入器。 Paimon source只能看到跨snapshot的合并更改,例如刪除了哪些鍵以及某些鍵的新值是什么。
但是,這些合并的更改無法形成完整的changelog,因為我們無法直接從中讀取鍵的舊值。 合并的更改要求消費者“記住”每個鍵的值并重寫這些值而不看到舊的值。 然而,一些消費者需要舊的值來確保正確性或效率。
考慮一個消費者計算某些分組鍵的總和(可能不等于主鍵)。 如果消費者只看到一個新值5,它無法確定應該將哪些值添加到求和結果中。 例如,如果舊值為 4,則應在結果中加 1。 但如果舊值是 6,則應依次從結果中減去 1。 舊的value對于這些類型的消費者來說很重要。
總而言之,沒有一個changelog producer最適合數據庫系統等使用者。 Flink 還有一個內置的“規范化”運算符,可以將每個鍵的值保留在狀態中。 很容易看出,這種操作符的成本非常高,應該避免使用。 (可以通過“scan.remove-normalize”強制刪除“normalize”運算符。)

Input

通過指定 'changelog- Producer' = 'input',Paimon Writer依賴他們的輸入作為完整changelog的來源。 所有輸入記錄將保存在單獨的changelog file中,并由 Paimon source提供給消費者。
當 Paimon 編寫者的輸入是完整的changelog(例如來自數據庫 CDC)或由 Flink 狀態計算生成時,可以使用input changelog producer.

Lookup

如果您的輸入無法生成完整的changelog,但想擺脫昂貴的標準化運算符,則可以考慮使用'lookup' changelog producer.
通過指定'changelog- Producer' = 'lookup',Paimon將在提交數據寫入之前通過'lookup'生成changelog。

Lookup 會將數據緩存在內存和本地磁盤上,您可以使用以下選項來調整性能:

Lookup changelog- Producer 支持changelog- Producer.row-deduplicate以避免為同一記錄生成-U、+U changelog。

Full Compaction

如果你覺得“lookup”的資源消耗太大,可以考慮使用“full-compaction”changelog Producer,它可以解耦數據寫入和changelog生成,更適合高延遲的場景(例如10分鐘) )。
通過指定 'changelog- Producer' = 'full-compaction',Paimon 將比較完全壓縮之間的結果并生成差異作為changelog。changelog的延遲受到完全壓縮頻率的影響。
通過指定 full-compaction.delta-commits 表屬性,在增量提交(檢查點 checkpoint)后將不斷觸發 full compaction。 默認情況下設置為 1,因此每個檢查點都會進行完全壓縮并生成change log。

Full-compaction changelog- Producer 支持changelog- Producer.row-deduplicate 以避免為同一記錄生成-U、+U 變更日志。

Sequence Field

默認情況下,主鍵表根據輸入順序確定合并順序(最后輸入的記錄將是最后合并的)。 然而在分布式計算中,會存在一些導致數據混亂的情況。 這時,可以使用時間字段作為sequence.field,例如:

CREATE TABLE MyTable (
    pk BIGINT PRIMARY KEY NOT ENFORCED,
    v1 DOUBLE,
    v2 BIGINT,
    dt TIMESTAMP
) WITH (
    'sequence.field' = 'dt'
);

無論輸入順序如何,具有最大sequence.field 值的記錄將是最后合并的記錄。
Sequence Auto Padding:
當記錄更新或刪除時,sequence.field必須變大,不能保持不變。 對于-U和+U,它們的序列字段必須不同。 如果您無法滿足此要求,Paimon 提供了自動填充序列字段的選項。

  1. 'sequence.auto-padding' = 'row-kind-flag':如果對-U和+U使用相同的值,就像Mysql Binlog中的“op_ts”(數據庫中進行更改的時間)一樣。 建議使用自動填充行類型標志,它會自動區分-U(-D)和+U(+I)。
  2. 精度不夠:如果提供的sequence.field不滿足精度,比如大約秒或毫秒,可以將sequence.auto-padding設置為秒到微或毫秒到微,這樣序列號的精度 將由系統彌補到微秒。
  3. 復合模式:例如“second-to-micro,row-kind-flag“,首先將micro添加到第二個,然后填充row-kind標志。

Row Kind Field

默認情況下,主鍵表根據輸入行確定行類型。 您還可以定義“rowkind.field”以使用字段來提取行類型。
有效的行類型字符串應為“+I”、“-U”、“+U”或“-D”。

總結

以上是生活随笔為你收集整理的聊聊流式数据湖Paimon(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。