【详谈 Delta Lake 】系列技术专题 之 特性(Features)
簡介: 本文翻譯自大數(shù)據(jù)技術(shù)公司 Databricks 針對數(shù)據(jù)湖 Delta Lake 的系列技術(shù)文章。眾所周知,Databricks 主導著開源大數(shù)據(jù)社區(qū) Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術(shù),而 Delta Lake 作為數(shù)據(jù)湖核心存儲引擎方案給企業(yè)帶來諸多的優(yōu)勢。本系列技術(shù)文章,將詳細展開介紹 Delta Lake。
前言
本文翻譯自大數(shù)據(jù)技術(shù)公司 Databricks 針對數(shù)據(jù)湖 Delta Lake 系列技術(shù)文章。眾所周知,Databricks 主導著開源大數(shù)據(jù)社區(qū) Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術(shù),而 Delta Lake 作為數(shù)據(jù)湖核心存儲引擎方案給企業(yè)帶來諸多的優(yōu)勢。
此外,阿里云和 Apache Spark 及 Delta Lake 的原廠 Databricks 引擎團隊合作,推出了基于阿里云的企業(yè)版全托管 Spark 產(chǎn)品——Databricks 數(shù)據(jù)洞察,該產(chǎn)品原生集成企業(yè)版 Delta Engine 引擎,無需額外配置,提供高性能計算能力。有興趣的同學可以搜索` Databricks 數(shù)據(jù)洞察`或`阿里云 Databricks `進入官網(wǎng),或者直接訪問https://www.aliyun.com/product/bigdata/spark 了解詳情。
譯者:張鵬(卓昇),阿里云計算平臺事業(yè)部技術(shù)專家
Delta Lake 技術(shù)系列 - 特性(Features)
——使用 Delta Lake 穩(wěn)定的特性來可靠的管理您的數(shù)據(jù)
目錄
- Chapter-01 ?為什么使用 Delta Lake 的 MERGE 功能?
- Chapter-02 ?使用 Python API 在 Delta Lake 數(shù)據(jù)表上進行簡單,可靠的更新和刪除操作
- Chapter-03 ?大型數(shù)據(jù)湖的 Time Travel 功能
- Chapter-04 ?輕松克隆您的 Delta Lake 以方便測試,數(shù)據(jù)共享以及進行重復的機器學習
- Chapter-05 ?在 Apache Spark 上的 Delta Lake 中啟用 Spark SQL 的 DDL 和 DML 語句
本文介紹內(nèi)容
Delta Lake 系列電子書由 Databricks 出版,阿里云計算平臺事業(yè)部大數(shù)據(jù)生態(tài)企業(yè)團隊翻譯,旨在幫助領(lǐng)導者和實踐者了解 Delta Lake 的全部功能以及它所處的場景。在本文 Delta Lake 系列 - 特性( Features )中,重點介紹 Delta Lake 的特性。
后續(xù)
讀完本文后,您不僅可以了解 Delta Lake 提供了那些特性,還可以理解這些的特性是如何帶來實質(zhì)性的性能改進的。
什么是 Delta Lake?
Delta Lake 是一個統(tǒng)一的數(shù)據(jù)管理系統(tǒng),為云上數(shù)據(jù)湖帶來數(shù)據(jù)可靠性和快速分析。Delta Lake 運行在現(xiàn)有數(shù)據(jù)湖之上,并且與 Apache Spark 的 API 完全兼容。
在 Databricks 中,我們看到了 Delta Lake 如何為數(shù)據(jù)湖帶來可靠性、高性能和生命周期管理。我們的客戶已經(jīng)驗證,Delta Lake 解決了以下挑戰(zhàn):從復雜的數(shù)據(jù)格式中提取數(shù)據(jù)、很難刪除符合要求的數(shù)據(jù)、以及為了進行數(shù)據(jù)捕獲從而修改數(shù)據(jù)所帶來的問題。
通過使用 Delta Lake,您可以加快高質(zhì)量數(shù)據(jù)導入數(shù)據(jù)湖的速度,團隊也可以在安全且可擴展云服務上快速使用這些數(shù)據(jù)。
Chapter-01 為什么使用 Delta Lake 的 MERGE 功能?
Delta Lake 是在 Apache Spark 之上構(gòu)建的下一代引擎,支持 MERGE 命令,該命令使您可以有效地在數(shù)據(jù)湖中上傳和刪除記錄。
MERGE 命令大大簡化了許多通用數(shù)據(jù)管道的構(gòu)建方式-所有重寫整個分區(qū)的低效且復雜的多跳步驟現(xiàn)在都可以由簡單的 MERGE 查詢代替。
這種更細粒度的更新功能簡化了如何為各種用例(從變更數(shù)據(jù)捕獲到 GDPR )構(gòu)建大數(shù)據(jù)管道的方式。您不再需要編寫復雜的邏輯來覆蓋表同時克服快照隔離的不足。
隨著數(shù)據(jù)的變化,另一個重要的功能是在發(fā)生錯誤寫入時能夠進行回滾。 Delta Lake 還提供了帶有時間旅行特性的回滾功能,因此如果您合并不當,則可以輕松回滾到早期版本。
在本章中,我們將討論需要更新或刪除現(xiàn)有數(shù)據(jù)的常見用例。我們還將探討新增和更新固有的挑戰(zhàn),并說明 MERGE 如何解決這些挑戰(zhàn)。
什么時候需要 upserts?
在許多常見場景中,都需要更新或刪除數(shù)據(jù)湖中的現(xiàn)有數(shù)據(jù):
- 遵守通用數(shù)據(jù)保護法規(guī)(GDPR):隨著 GDPR 中數(shù)據(jù)遺忘規(guī)則(也稱為數(shù)據(jù)擦除)的推出,組織必須根據(jù)要求刪除用戶的信息。數(shù)據(jù)擦除還包括刪除數(shù)據(jù)湖中的用戶信息。
- 更改傳統(tǒng)數(shù)據(jù)庫中獲得的數(shù)據(jù):在面向服務的體系結(jié)構(gòu)中,典型的 web 和移動應用程序采用微服務架構(gòu),這些微服務架構(gòu)一般是基于具有低延遲性能的傳統(tǒng) SQL/NoSQL 數(shù)據(jù)庫搭建的。組織面臨的最大挑戰(zhàn)之一是將許多孤立的數(shù)據(jù)系統(tǒng)建立連接,因此數(shù)據(jù)工程師建立了管道,可以將所有數(shù)據(jù)源整合到中央數(shù)據(jù)湖中以加快分析。這些管道必須定期讀取傳統(tǒng) SQL/NoSQL 表所做的更改,并將其應用于數(shù)據(jù)湖中的對應表中。此類更改可以支持多種形式:變化緩慢的表,所有插入/更新/刪除數(shù)據(jù)的數(shù)據(jù)變更等。
- 會話化:從產(chǎn)品分析,到目標廣告,再到預測性維護的許多領(lǐng)域,將多個事件分組為一個會話是常見的例子。建立連續(xù)的應用來跟蹤會話并記錄寫入數(shù)據(jù)湖的結(jié)果是非常困難的,因為數(shù)據(jù)湖經(jīng)常因為追加的數(shù)據(jù)而進行優(yōu)化。
- 重復數(shù)據(jù)刪除:常見的數(shù)據(jù)管道用例是通過追加數(shù)據(jù)的方式來將系統(tǒng)日志收集到 Delta Lake 表中。但是數(shù)據(jù)源通常會生成重復記錄,并且需要下游刪除重復數(shù)據(jù)來處理它們。
為什么對數(shù)據(jù)湖的 upserts 在傳統(tǒng)上具有挑戰(zhàn)性
由于數(shù)據(jù)湖基本上是基于文件的,它們經(jīng)常針對新增數(shù)據(jù)而不是更改現(xiàn)有數(shù)據(jù)進行優(yōu)化。因此構(gòu)建上述用例一直是具有挑戰(zhàn)性的。
用戶通常會讀取整個表(或分區(qū)的子集),然后將其覆蓋。因此,每個組織都嘗試通過編寫復雜的查詢 SQL,Spark 等方式來重新造輪子,來滿足他們的需求。這種方法的特點是:
- 低效:為了更新很少的記錄而讀取和重寫整個分區(qū)(或整個表)會導致管道運行緩慢且成本高昂。手動調(diào)整表布局以及優(yōu)化查詢是很繁瑣的,而且需要深厚的領(lǐng)域知識。
- 有可能出錯:手寫代碼來修改數(shù)據(jù)很容易出現(xiàn)邏輯和人為錯誤。例如,多個管道在沒有任何事務支持的情況下同時修改同一張表可能會導致不可預測的數(shù)據(jù)不一致,在最壞的情況下有可能會導致數(shù)據(jù)丟失。通常,即使是單一的手寫管道也可能由于業(yè)務邏輯中的錯誤,從而導致數(shù)據(jù)損壞。
- 難以維護:從根本上來說,這類手寫代碼難以理解,跟蹤和維護。從長遠來看,僅此一項就會顯著增加組織和基礎(chǔ)設(shè)施成本。
介紹 Delta Lake 中 MERGE 命令
使用 Delta Lake,您可以使用以下 MERGE 命令輕松解決上述用例,并且不會遇到任何上述問題:
讓我們通過一個簡單的示例來了解如何使用 MERGE。 假設(shè)您有一個變化緩慢的用戶數(shù)據(jù)表,該表維護著諸如地址之類的用戶信息。 此外您還有一個現(xiàn)有用戶和新用戶的新地址表。 要將所有新地址合并到主用戶表中,可以運行以下命令:
MERGE INTO users USING updates ON users.userId = updates.userId WHEN MATCHED THEN UPDATE SET address = updates.addresses WHEN NOT MATCHED THENINSERT (userId, address) VALUES (updates.userId, updates.address)這完全符合語法的要求-對于現(xiàn)有用戶(即 MATCHED 子句),它將更新 address 列,對于新用戶(即 NOT MATCHED 子句),它將插入所有列。 對于具有 TB 規(guī)模的大型數(shù)據(jù)表,Delta Lake MERGE 操作比覆蓋整個分區(qū)或表要快N個數(shù)量級,因為 Delta Lake 僅讀取相關(guān)文件并更新它們。 具體來說,Delta Lake 的 MERGE 命令具有以下優(yōu)勢:
- 細粒度:該操作以文件而不是分區(qū)的粒度重寫數(shù)據(jù),這樣解決了重寫分區(qū),使用 MSCK 更新 Hive 元數(shù)據(jù)庫等所有復雜問題。
- 高效:Delta Lake 的數(shù)據(jù) skip 功能使 MERGE 在查找要重寫的文件方面更高效,從而無需手動優(yōu)化管道。 此外 Delta Lake 對所有 I/O 和處理過程進行了優(yōu)化,使得 MERGE 進行所有數(shù)據(jù)的讀寫速度明顯快于 Apache Spark 中的類似操作。
- 事務性:Delta Lake 使用樂觀并發(fā)控制來確保并發(fā)寫入程序使用 ACID 事務來正確更新數(shù)據(jù),同時并發(fā)讀取程序始終會看到一致的數(shù)據(jù)快照。
下圖是 MERGE 與手寫管道的直觀對比。
使用 MERGE 簡化用例
遵守 GDPR 而刪除數(shù)據(jù)
遵守 GDPR 的“被遺忘權(quán)”條款對數(shù)據(jù)湖中的數(shù)據(jù)進行任何處理都不容易。您可以使用示例代碼來設(shè)置一個簡單的定時計劃作業(yè),如下所示,刪除所有選擇退出服務的用戶。
MERGE INTO users USING opted_out_users ON opted_out_users.userId = users.userId WHEN MATCHED THEN DELETE數(shù)據(jù)庫中的數(shù)據(jù)變更應用
您可以使用 MERGE 語法輕松地將外部數(shù)據(jù)庫的所有數(shù)據(jù)更改(更新,刪除,插入)應用到 Delta Lake 表中,如下所示:
MERGE INTO users USING ( SELECT userId, latest.address AS address, latest.deleted AS deleted FROM ( SELECT userId, MAX(struct(TIME, address, deleted)) AS latest FROM changes GROUP BY userId ) ) latestChange ON latestChange.userId = users.userId WHEN MATCHED AND latestChange.deleted = TRUE THEN DELETE WHEN MATCHED THEN UPDATE SET address = latestChange.address WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN INSERT (userId, address) VALUES (userId, address)從 streaming 管道更新會話信息
如果您有流事件的數(shù)據(jù)流入,并且想要對流事件數(shù)據(jù)進行會話化,同時增量更新會話并將其存儲在 Delta Lake 表中,則可以使用結(jié)構(gòu)化數(shù)據(jù)流和 MERGE 中的 foreachBatch 來完成此操作。 例如,假設(shè)您有一個結(jié)構(gòu)化流數(shù)據(jù)框架,該框架為每個用戶計算更新的 session 信息。 您可以在所有會話應用中啟動流查詢,更新數(shù)據(jù)到 Delta Lake 表中,如下所示(Scala 語言)。
streamingSessionUpdatesDF.writeStream .foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) => microBatchOutputDF.createOrReplaceTempView(“updates”) microBatchOutputDF.sparkSession.sql(s””” MERGE INTO sessions USING updates ON sessions.sessionId = updates.sessionId WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) }.start()Chapter-02 使用Python API在Delta Lake數(shù)據(jù)表上進行簡單,可靠的更新和刪除操作
在本章中,我們將演示在飛機時刻表的場景中,如何在 Delta Lake 中使用 Python 和新的 Python API。 我們將展示如何新增,更新和刪除數(shù)據(jù),如何使用 time travle 功能來查詢舊版本數(shù)據(jù),以及如何清理較舊的版本。
Delta Lake 使用入門
Delta Lake 軟件包可以通過 PySpark 的--packages 選項來進行安裝。在我們的示例中,我們還將演示在 VACUUM 文件和 Apache Spark 中執(zhí)行 Delta Lake SQL 命令的功能。 由于這是一個簡短的演示,因此我們還將啟用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false允許我們清理文件的時間短于默認的保留時間7天。 注意,這僅是對于 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension在 Apache Spark 中啟用 Delta Lake SQL 命令;這對于 Python 或 Scala API 調(diào)用不是必需的。
# Using Spark Packages ./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”Delta Lake 數(shù)據(jù)的加載和保存
這次將使用準時飛行數(shù)據(jù)或離港延誤數(shù)據(jù),這些數(shù)據(jù)是從 RITA BTS 航班離崗統(tǒng)計中心生成的;這些數(shù)據(jù)的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 針對Apache Spark的具有圖形化結(jié)構(gòu)的準時飛行數(shù)據(jù)。 在 PySpark 中,首先讀取數(shù)據(jù)集。
# Location variables tripdelaysFilePath = “/root/data/departuredelays.csv” pathToEventsTable = “/root/deltalake/departureDelays.delta”# Read flight delay data departureDelays = spark.read \ .option(“header”, “true”) \ .option(“inferSchema”, “true”) \ .csv(tripdelaysFilePath)接下來,我們將離港延遲數(shù)據(jù)保存到 Delta Lake 表中。 在保存的過程中,我們能夠利用它的優(yōu)勢功能,包括 ACID 事務,統(tǒng)一批處理,streaming 和 time travel。
# Save flight delay data into Delta Lake format departureDelays \ .write \ .format(“delta”) \ .mode(“overwrite”) \ .save(“departureDelays.delta”)注意,這種方法類似于保存 Parquet 數(shù)據(jù)的常用方式。 現(xiàn)在您將指定格式(“delta”)而不是指定格式(“parquet”)。如果要查看基礎(chǔ)文件系統(tǒng),您會注意到為 Delta Lake 的離港延遲表創(chuàng)建了四個文件。
/departureDelays.delta$ ls -l . .. _delta_log part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet現(xiàn)在,讓我們重新加載數(shù)據(jù),但是這次我們的數(shù)據(jù)格式將由 Delta Lake 支持。
# Load flight delay data in Delta Lake format delays_delta = spark \ .read \ .format(“delta”) \ .load(“departureDelays.delta”) # Create temporary view delays_delta.createOrReplaceTempView(“delays_delta”)# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()運行結(jié)果:
最后,我們確定了從西雅圖飛往舊金山的航班數(shù)量;在此數(shù)據(jù)集中,有1698個航班。
立馬轉(zhuǎn)換到 Delta Lake
如果您有現(xiàn)成的 Parquet 表,則可以將它們轉(zhuǎn)換為 Delta Lake 格式,從而無需重寫表。 如果要轉(zhuǎn)換表,可以運行以下命令。
from delta.tables import *# Convert non partitioned parquet table at path ‘/path/to/table’ deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)# Convert partitioned parquet table at path ‘/path/to/table’ and partitioned by integer column named ‘part’ partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)刪除我們的航班數(shù)據(jù)
要從傳統(tǒng)的數(shù)據(jù)湖表中刪除數(shù)據(jù),您將需要:
從上面的查詢中可以看到,我們刪除了所有準時航班和早班航班(更多信息,請參見下文),從西雅圖到舊金山的航班有837班延誤。 如果您查看文件系統(tǒng),會注意到即使刪除了一些數(shù)據(jù),還是有更多文件。
/departureDelays.delta$ ls -l _delta_log part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet在傳統(tǒng)的數(shù)據(jù)湖中,刪除是通過重寫整個表(不包括要刪除的值)來執(zhí)行的。 使用 Delta Lake,可以通過有選擇地寫入包含要刪除數(shù)據(jù)的文件的新版本來執(zhí)行刪除操作,同時僅將以前的文件標記為已刪除。 這是因為 Delta Lake 使用多版本并發(fā)控制(MVCC)對表執(zhí)行原子操作:例如,當一個用戶正在刪除數(shù)據(jù)時,另一用戶可能正在查詢之前的版本。這種多版本模型還使我們能夠回溯時間(即 time travel)并查詢以前的版本,這個功能稍后我們將看到。
更新我們的航班數(shù)據(jù)
要更新傳統(tǒng)數(shù)據(jù)湖表中的數(shù)據(jù),您需要:
代替上面的步驟,使用 Delta Lake 我們可以通過運行 UPDATE 語句來簡化此過程。 為了顯示這一點,讓我們更新所有從底特律到西雅圖的航班。
# Update all flights originating from Detroit to now be originating from Seattle deltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()如今底特律航班已被標記為西雅圖航班,現(xiàn)在我們有986航班從西雅圖飛往舊金山。如果您要列出您的離崗延遲文件系統(tǒng)(即 $ ../departureDelays/ls -l),您會注意到現(xiàn)在有11個文件(而不是刪除文件后的8個文件和表創(chuàng)建后的4個文件)。
合并我們的航班數(shù)據(jù)
使用數(shù)據(jù)湖時,常見的情況是將數(shù)據(jù)連續(xù)追加到表中。這通常會導致數(shù)據(jù)重復(您不想再次將其插入表中),需要插入的新行以及一些需要更新的行。 使用 Delta Lake,所有這些都可以通過使用合并操作(類似于 SQL MERGE 語句)來實現(xiàn)。
讓我們從一個樣本數(shù)據(jù)集開始,您將通過以下查詢對其進行更新,插入或刪除重復數(shù)據(jù)。
# What flights between SEA and SFO for these date periods spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()該查詢的輸出如下表所示。 請注意,已添加顏色編碼以清楚地標識哪些行是已刪除的重復數(shù)據(jù)(藍色),已更新的數(shù)據(jù)(黃色)和已插入的數(shù)據(jù)(綠色)。
接下來,讓我們生成自己的 merge_table,其中包含將插入,更新或刪除重復的數(shù)據(jù)。具體看以下代碼段
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’), (1010822, 31, 590, ‘SEA’, ‘SFO’)] cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’] merge_table = spark.createDataFrame(items, cols) merge_table.toPandas()在上表(merge_table)中,有三行不同的日期值:
使用 Delta Lake,可以通過合并語句輕松實現(xiàn),具體看下面代碼片段。
# Merge merge_table with flights deltaTable.alias(“flights”) \ .merge(merge_table.alias(“updates”),”flights.date = updates.date”) \ .whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \ .execute() # What flights between SEA and SFO for these date periods spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()一條語句即可有效完成刪除重復數(shù)據(jù),更新和插入這三個操作。
查看數(shù)據(jù)表歷史記錄
如前所述,在我們進行每個事務(刪除,更新)之后,在文件系統(tǒng)中創(chuàng)建了更多文件。 這是因為對于每個事務,都有不同版本的 Delta Lake 表。
這可以通過使用 DeltaTable.history() 方法看到,如下所示。
注意,您還可以使用 SQL 執(zhí)行相同的任務:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所見,對于每個操作(創(chuàng)建表,刪除和更新),都有三行代表表的不同版本(以下為簡化版本,以幫助簡化閱讀):
回溯數(shù)據(jù)表的歷史
借助 Time Travel,您可以查看帶有版本或時間戳的 Delta Lake 表。要查看歷史數(shù)據(jù),請指定版本或時間戳選項。 在以下代碼段中,我們將指定版本選項。
# Load DataFrames for each version dfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”) dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”) dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)# Calculate the SEA to SFO flight counts for each version of history cnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()# Print out the value print(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))## Output SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986無論是用于治理,風險管理,合規(guī)(GRC)還是錯誤時進行回滾,Delta Lake 表都包含元數(shù)據(jù)(例如,記錄操作員刪除的事實)和數(shù)據(jù)(例如,實際刪除的行)。但是出于合規(guī)性或大小原因,我們?nèi)绾蝿h除數(shù)據(jù)文件?
使用 vacuum 清理舊版本的數(shù)據(jù)表
默認情況下,Delta Lake vacuum 方法將刪除所有超過7天參考時間的行(和文件)。如果要查看文件系統(tǒng),您會注意到表的11個文件。
/departureDelays.delta$ ls -l _delta_logpart-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet要刪除所有文件,以便僅保留當前數(shù)據(jù)快照,您可以 vacuum 方法指定一個較小的值(而不是默認保留7天)。
# Remove all files older than 0 hours old. deltaTable.vacuum(0) Note, you perform the same task via SQL syntax: ? # Remove all files older than 0 hours old spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)清理完成后,當您查看文件系統(tǒng)時,由于歷史數(shù)據(jù)已被刪除,您會看到更少的文件。
/departureDelays.delta$ ls -l _delta_log part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet請注意,運行 vacuum 之后,回溯到比保留期更早的版本的功能將會失效。
Chapter-03 大型數(shù)據(jù)湖的 Time Travel 功能
Delta Lake 提供 Time Travel 功能。 Delta Lake 是一個開源存儲層,可為數(shù)據(jù)湖帶來可靠性。 Delta Lake 提供 ACID 事務,可伸縮的元數(shù)據(jù)處理,以及批流一體數(shù)據(jù)處理。 Delta Lake 在您現(xiàn)有的數(shù)據(jù)湖之上運行,并且與 Apache Spark API 完全兼容。
使用此功能,Delta Lake 會自動對您存儲在數(shù)據(jù)湖中的大數(shù)據(jù)進行版本控制,同時您可以訪問該數(shù)據(jù)的任何歷史版本。這種臨時數(shù)據(jù)管理可以簡化您的數(shù)據(jù)管道,包括簡化審核,在誤寫入或刪除的情況下回滾數(shù)據(jù)以及重現(xiàn)實驗和報告。
您的組織最終可以在一個干凈,集中化,版本化的云上大數(shù)據(jù)存儲庫上實現(xiàn)標準化,以此進行分析。
更改數(shù)據(jù)的常見挑戰(zhàn)
- 審核數(shù)據(jù)更改:審核數(shù)據(jù)更改對于數(shù)據(jù)合規(guī)性以及簡單的調(diào)試(以了解數(shù)據(jù)如何隨時間變化)都至關(guān)重要。在這種情況下,傳統(tǒng)數(shù)據(jù)系統(tǒng)都轉(zhuǎn)向大數(shù)據(jù)技術(shù)和云服務。
- 重現(xiàn)實驗和報告:在模型訓練期間,數(shù)據(jù)科學家對給定的數(shù)據(jù)集執(zhí)行不同參數(shù)的各種實驗。當科學家在一段時間后重新訪問實驗以重現(xiàn)模型時,通常源數(shù)據(jù)已被上游管道修改。很多時候他們不知道這些上游數(shù)據(jù)發(fā)生了更改,因此很難重現(xiàn)他們的實驗。一些科學家和最好的工程師通過創(chuàng)建數(shù)據(jù)的多個副本來進行實踐,從而增加了存儲量的費用。對于生成報告的分析師而言,情況也是如此。
- 回滾:數(shù)據(jù)管道有時會向下游消費者寫入臟數(shù)據(jù)。發(fā)生這種情況的原因可能是基礎(chǔ)架構(gòu)不穩(wěn)定或者混亂的數(shù)據(jù)或者管道中的 Bug 等問題。對目錄或表進行簡單追加的管道,可以通過基于日期的分區(qū)輕松完成回滾。隨著更新和刪除,這可能變得非常復雜,數(shù)據(jù)工程師通常必須設(shè)計復雜的管道來應對這種情況。
使用Time Travel功能
Delta Lake 的 time travel 功能簡化了上述用例的數(shù)據(jù)管道構(gòu)建。Delta Lake 中的 Time Travel 極大地提高了開發(fā)人員的生產(chǎn)力。它有助于:
- 數(shù)據(jù)科學家可以更好地管理實驗
- 數(shù)據(jù)工程師簡化了管道同時可以回滾臟數(shù)據(jù)
- 數(shù)據(jù)分析師可以輕松地分析報告
企業(yè)最終可以在干凈,集中化,版本化的云存儲中的大數(shù)據(jù)存儲庫上建立標準化,在此基礎(chǔ)上進行數(shù)據(jù)分析。我們很高興看到您將能夠使用此功能完成工作。
當您寫入 Delta Lake 表或目錄時,每個操作都會自動進行版本控制。您可以通過兩種不同的方式訪問數(shù)據(jù)的不同版本:
使用時間戳
Scala 語法
您可以將時間戳或日期字符串作為 DataFrame 閱讀器的選項來提供:
val df = spark.read .format(“delta”) . option(“timestampAsOf”, “2019-01-01”) .load(“/path/to/my/table”) df = spark.read \ .format(“delta”) \ .option(“timestampAsOf”, “2019-01-01”) \ .load(“/path/to/my/table”) SQL語法 SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01” SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”如果您無權(quán)訪問閱讀器的代碼庫,您可以將輸入?yún)?shù)傳遞給該庫以讀取數(shù)據(jù),通過將 yyyyMMddHHmmssSSS 格式的時間戳傳遞給表來進行數(shù)據(jù)回滾:
val inputPath = “/path/to/my/table@20190101000000000” val df = loadData(inputPath) // Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = { spark.read .format(“delta”) .load(inputPath) } inputPath = “/path/to/my/table@20190101000000000” df = loadData(inputPath)# Function in a library that you don’t have access to def loadData(inputPath): return spark.read \ .format(“delta”) \ .load(inputPath) }使用版本號
在 Delta Lake 中,每次寫入都有一個版本號,您也可以使用該版本號來進行回溯。
Scala語法
val df = spark.read .format(“delta”) .option(“versionAsOf”, “5238”) .load(“/path/to/my/table”)val df = spark.read .format(“delta”) .load(“/path/to/my/table@v5238”)Python語法
df = spark.read \.format(“delta”) \ .option(“versionAsOf”, “5238”) \ .load(“/path/to/my/table”)df = spark.read \.format(“delta”) \ .load(“/path/to/my/table@v5238”)SQL語法
SELECT count(*) FROM my_table VERSION AS OF 5238審核數(shù)據(jù)變更
您可以使用 DESCRIBE HISTORY 命令或通過 UI 來查看表更改的歷史記錄。
重做實驗和報告
Time travel 在機器學習和數(shù)據(jù)科學中也起著重要作用。模型和實驗的可重復性是數(shù)據(jù)科學家的關(guān)鍵考慮因素,因為他們通常在投入生產(chǎn)之前會創(chuàng)建數(shù)百個模型,并且在那個耗時的過程中,有可能想回到之前早期的模型。 但是由于數(shù)據(jù)管理通常與數(shù)據(jù)科學工具是分開的,因此確實很難實現(xiàn)。
Databricks 將 Delta Lake 的 Time Travel 功能與 MLflow(機器學習生命周期的開源平臺)相集成來解決可重復實驗的問題。 為了重新進行機器學習培訓,您只需將帶有時間戳的 URL 路徑作為 MLflow 參數(shù)來跟蹤每個訓練作業(yè)的數(shù)據(jù)版本。
這使您可以返回到較早的設(shè)置和數(shù)據(jù)集以重現(xiàn)較早的模型。 您無需與上游團隊就數(shù)據(jù)進行協(xié)調(diào),也不必擔心為不同的實驗克隆數(shù)據(jù)。 這就是統(tǒng)一分析的力量,數(shù)據(jù)科學與數(shù)據(jù)工程緊密結(jié)合在一起。
回滾
Time travel 可以在產(chǎn)生臟數(shù)據(jù)的情況下方便回滾。 例如,如果您的 GDPR 管道作業(yè)有一個意外刪除用戶信息的 bug,您可以用下面方法輕松修復管道:
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111 You can also fix incorrect updates as follows: MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *如果您只想回滾到表的之前版本,則可以使用以下任一命令來完成:
RESTORE TABLE my_table VERSION AS OF [version_number] RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]固定視圖的不斷更新跨多個下游作業(yè)的 Delta Lake 表
通過 AS OF 查詢,您現(xiàn)在可以為多個下游作業(yè)固定不斷更新的 Delta Lake 表的快照??紤]一種情況,其中 Delta Lake 表正在不斷更新,例如每15秒更新一次,并且有一個下游作業(yè)會定期從此 Delta Lake 表中讀取數(shù)據(jù)并更新不同的目標表。 在這種情況下,通常需要一個源 Delta Lake 表的一致視圖,以便所有目標表都反映相同的狀態(tài)。
現(xiàn)在,您可以按照下面的方式輕松處理這種情況:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()# Will use the latest version of the table for all operations belowdata = spark.table(“my_table@v%s” % version[0][0]data.where(“event_type = e1”).write.jdbc(“table1”)data.where(“event_type = e2”).write.jdbc(“table2”) ...data.where(“event_type = e10”).write.jdbc(“table10”)時間序列分析查詢變得簡單
Time travel 還簡化了時間序列分析。例如,如果您想了解上周添加了多少新客戶,則查詢可能是一個非常簡單的方式,如下所示:
SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)) FROM my_tableChapter-04 輕松克隆您的 Delta Lake 以方便測試,數(shù)據(jù)共享以及重復進行機器學習
Delta Lake 有一個表克隆的功能,可以輕松進行測試,共享和重新創(chuàng)建表以實現(xiàn) ML 的多次訓練。在數(shù)據(jù)湖或數(shù)據(jù)倉庫中創(chuàng)建表的副本有幾種實際用途。但是考慮到數(shù)據(jù)湖中表的數(shù)據(jù)量及其增長速度,進行表的物理副本是一項昂貴的操作。
借助表克隆,Delta Lake 現(xiàn)在使該過程更簡單且更省成本。
什么是克隆?
克隆是源表在給定時間點的副本。它們具有與源表相同的元數(shù)據(jù):相同表結(jié)構(gòu),約束,列描述,統(tǒng)計信息和分區(qū)。但是它們是一個單獨的表,具有單獨的體系或歷史記錄。對克隆所做的任何更改只會影響克隆表,而不會影響源表。由于快照隔離,在克隆過程中或之后發(fā)生的源表更改也不會反映到克隆表中。在 Delta Lake 中,我們有兩種克隆方式:淺克隆或深克隆。
淺克隆
淺克隆(也稱為零拷貝)僅復制要克隆的表的元數(shù)據(jù);表本身的數(shù)據(jù)文件不會被復制。這種類型的克隆不會創(chuàng)建數(shù)據(jù)的另一物理副本,從而將存儲成本降至最低。淺克隆很便宜,而且創(chuàng)建起來非???。
這些克隆表自己不作為數(shù)據(jù)源,而是依賴于它們的源文件作為數(shù)據(jù)源。如果刪除了克隆表所依賴的源文件,例如使用 VACUUM,則淺克隆可能會變得不可用。因此,淺克隆通常用于短期使用案例,例如測試和實驗。
深克隆
淺克隆非常適合短暫的用例,但某些情況下需要表數(shù)據(jù)的獨立副本。深克隆會復制源表的元數(shù)據(jù)和數(shù)據(jù)文件全部信息。從這個意義上講,它的功能類似于使用 CTAS 命令(CREATE TABLE .. AS ... SELECT ...)進行復制。但是由于它可以按指定版本復制原始表,因此復制起來更簡單,同時您無需像使用 CTAS 一樣重新指定分區(qū),約束和其他信息。此外它更快,更健壯,也可以針對故障使用增量方式進行工作。
使用深克隆,我們將復制額外的元數(shù)據(jù),例如 streaming 應用程序事務和 COPY INTO 事務。因此您可以在深克隆之后繼續(xù)運行 ETL 應用程序。
克隆的適用場景?
有時候我希望有一個克隆人來幫助我做家務或魔術(shù)。但是我們這里不是在談論人類克隆。在許多情況下,您需要數(shù)據(jù)集的副本-用于探索,共享或測試 ML 模型或分析查詢。以下是一些客戶用例的示例。
用生產(chǎn)表進行測試和試驗
當用戶需要測試其數(shù)據(jù)管道的新版本時,他們通常依賴一些測試數(shù)據(jù)集,這些測試數(shù)據(jù)跟其生產(chǎn)環(huán)境中的數(shù)據(jù)還是有很大不同。數(shù)據(jù)團隊可能也想嘗試各種索引技術(shù),以提高針對海量表的查詢性能。這些實驗和測試想在生產(chǎn)環(huán)境進行,就得冒影響線上數(shù)據(jù)和用戶的風險。
為測試或開發(fā)環(huán)境拷貝線上數(shù)據(jù)表可能需要花費數(shù)小時甚至數(shù)天的時間。此外,開發(fā)環(huán)境保存所有重復的數(shù)據(jù)會產(chǎn)生額外的存儲成本-設(shè)置反映生產(chǎn)數(shù)據(jù)的測試環(huán)境會產(chǎn)生很大的開銷。 對于淺克隆,這是微不足道的:
-- SQL CREATE TABLE delta.`/some/test/location` SHALLOW CLONE prod.events# Python DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=True)// Scala DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=true)在幾秒鐘內(nèi)創(chuàng)建完表的淺克隆之后,您可以開始運行管道的副本以測試新代碼,或者嘗試在不同維度上優(yōu)化表,可以看到查詢性能提高了很多很多。 這些更改只會影響您的淺克隆,而不會影響原始表。
暫存對生產(chǎn)表的重大更改
有時,您可能需要對生產(chǎn)表進行一些重大更改。 這些更改可能包含許多步驟,并且您不希望其他用戶看到您所做的更改,直到您完成所有工作。 淺克隆可以在這里為您提供幫助:
-- SQL CREATE TABLE temp.staged_changes SHALLOW CLONE prod.events; DELETE FROM temp.staged_changes WHERE event_id is null; UPDATE temp.staged_changes SET change_date = current_date() WHERE change_date is null; ... -- Perform your verifications對結(jié)果滿意后,您有兩種選擇。 如果未對源表進行任何更改,則可以用克隆替換源表。如果對源表進行了更改,則可以將更改合并到源表中。
-- If no changes have been made to the source REPLACE TABLE prod.events CLONE temp.staged_changes; -- If the source table has changed MERGE INTO prod.events USING temp.staged_changes ON events.event_id <=> staged_changes.event_id WHEN MATCHED THEN UPDATE SET *; -- Drop the staged table DROP TABLE temp.staged_changes;機器學習結(jié)果的可重復性
訓練出有效的 ML 模型是一個反復的過程。在調(diào)整模型不同部分的過程中,數(shù)據(jù)科學家需要根據(jù)固定的數(shù)據(jù)集來評估模型的準確性。
這是很難做到的,特別是在數(shù)據(jù)不斷被加載或更新的系統(tǒng)中。 在訓練和測試模型時需要一個數(shù)據(jù)快照。 此快照支持了 ML 模型的重復訓練和模型治理。
我們建議利用 Time Travel 在一個快照上運行多個實驗;在 Machine Learning Data Lineage With MLflow and Delta Lake 中可以看到一個實際的例子。
當您對結(jié)果感到滿意并希望將數(shù)據(jù)存檔以供以后檢索時(例如,下一個黑色星期五),可以使用深克隆來簡化歸檔過程。 MLflow 與 Delta Lake 的集成非常好,并且自動記錄功能(mlflow.spark.autolog()方法)將告訴您使用哪個數(shù)據(jù)表版本進行了一組實驗。
# Run your ML workloads using Python and then DeltaTable.forName(spark, “feature_store”).cloneAtVersion(128, “feature_ store_bf2020”)數(shù)據(jù)遷移
出于性能或管理方面的原因,可能需要將大量表移至新的專用存儲系統(tǒng)。原始表將不再接收新的更新,并且將在以后的某個時間點停用和刪除。深度克隆使海量表的復制更加健壯和可擴展。
-- SQL CREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events; ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;由于借助深克隆,我們復制了流應用程序事務和 COPY INTO 事務,因此您可以從遷移后停止的確切位置繼續(xù)ETL應用程序!
資料共享
在一個組織中,來自不同部門的用戶通常都在尋找可用于豐富其分析或模型的數(shù)據(jù)集。您可能希望與組織中的其他用戶共享數(shù)據(jù)。 但不是建立復雜的管道將數(shù)據(jù)移動到另一個里,而是創(chuàng)建相關(guān)數(shù)據(jù)集的副本通常更加容易和經(jīng)濟。這些副本以供用戶瀏覽和測試數(shù)據(jù)來確認其是否適合他們的需求而不影響您自己生產(chǎn)系統(tǒng)的數(shù)據(jù)。在這里深克隆再次起到關(guān)鍵作用。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;數(shù)據(jù)存檔
出于監(jiān)管或存檔的目的,表中的所有數(shù)據(jù)需要保留一定的年限,而活動表則將數(shù)據(jù)保留幾個月。如果您希望盡快更新數(shù)據(jù),但又要求將數(shù)據(jù)保存幾年,那么將這些數(shù)據(jù)存儲在一個表中并進行 time travel 可能會變得非常昂貴。
在這種情況下,每天,每周,每月歸檔數(shù)據(jù)是一個更好的解決方案。深克隆的增量克隆功能將在這里為您提供真正的幫助。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;請注意,與源表相比此表將具有獨立的歷史記錄,因此根據(jù)您的存檔頻率,源表和克隆表上的 time travel 查詢可能會返回不同的結(jié)果。
看起來真棒!有問題嗎?
這里只是重申上述一些陷阱,請注意以下幾點:
- 克隆是在你的快照上進行的。對克隆開始后的源表變化不會反映在克隆中。
- 淺克隆不像深克隆那樣是自包含的表。如果在源表中刪除了數(shù)據(jù)(例如通過 VACUUM),那么您的淺克隆可能無法使用。
- 克隆與源表具有獨立的歷史記錄。在源表和克隆表上的 time travel 查詢可能不會返回相同的結(jié)果。
- 淺克隆不復制流事務或?qū)⒏北緩椭频皆獢?shù)據(jù)。使用深層克隆來遷移表,可以從上次暫停的地方繼續(xù)進行 ETL 處理。
我該如何使用?
淺克隆和深克隆支持數(shù)據(jù)團隊在測試和管理其新型云數(shù)據(jù)湖和倉庫如何開展新功能。表克隆可以幫助您的團隊對其管道實施生產(chǎn)級別的測試,微調(diào)索引以實現(xiàn)最佳查詢性能,創(chuàng)建表副本以進行共享-所有這些都以最小的開銷和費用實現(xiàn)。如果您的組織需要這樣做,我們希望您能嘗試克隆表并提供反饋意見-我們期待聽到您將來的新用例和擴展。
Chapter-05 在 Apache Spark 3.0 上的 Delta Lake 中啟用 Spark SQL DDL 和 DML 功能
Delta Lake 0.7.0 的發(fā)布與 Apache Spark 3.0 的發(fā)布相吻合,從而啟用了一組新功能,這些功能使用了 Delta Lake 的 SQL 功能進行了簡化。以下是一些關(guān)鍵功能。
在 Hive Metastore 中定義表支持 SQL DDL 命令
現(xiàn)在,您可以在 Hive Metastore 中定義 Delta 表,并在創(chuàng)建(或替換)表時在所有 SQL 操作中使用表名。
創(chuàng)建或替換表
-- Create table in the metastore CREATE TABLE events (date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA PARTITIONED BY (date) LOCATION ‘/delta/events’ -- If a table with the same name already exists, the table is replaced with the new configuration, else it is created CREATE OR REPLACE TABLE events (date DATE, eventId STRING,eventType STRING,data STRING) USING DELTA PARTITIONED BY (date) LOCATION ‘/delta/events’顯式更改表架構(gòu)
-- Alter table and schema ALTER TABLE table_name ADD COLUMNS (col_name data_type[COMMENT col_comment] [FIRST|AFTER colA_name], ...)您還可以使用 Scala / Java / Python API:
- DataFrame.saveAsTable(tableName) 和 DataFrameWriterV2 APIs。
- DeltaTable.forName(tableName) 這個 API 用于創(chuàng)建 io.delta.tables.DeltaTable 實例,對于在 Scala/Java/Python 中執(zhí)行 Update/Delete/Merge 操作是非常有用。
支持 SQL 插入,刪除,更新和合并
通過 Delta Lake Tech Talks,最常見的問題之一是何時可以在 Spark SQL 中使用 DML 操作(如刪除,更新和合并)?不用再等了,這些操作現(xiàn)在已經(jīng)可以在 SQL 中使用了! 以下是有關(guān)如何編寫刪除,更新和合并(使用 Spark SQL 進行插入,更新,刪除和重復數(shù)據(jù)刪除操作)的示例。
-- Using append mode, you can atomically add new data to an existing Delta table INSERT INTO events SELECT * FROM newEvents -- To atomically replace all of the data in a table, you can use overwrite mode INSERT OVERWRITE events SELECT * FROM newEvents-- Delete events DELETE FROM events WHERE date < ‘2017-01-01’-- Update events UPDATE events SET eventType = ‘click’ WHERE eventType = ‘click’-- Upsert data to a target Delta -- table using merge MERGE INTO events USING updatesON events.eventId = updates.eventId WHEN MATCHED THEN UPDATESET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data)VALUES (date, eventId, data)值得注意的是,Delta Lake 中的合并操作比標準 ANSI SQL 語法支持更高級的語法。例如,合并支持
- 刪除操作-刪除與源數(shù)據(jù)行匹配的目標。 例如,“...配對后刪除...”
- 帶有子句條件的多個匹配操作-當目標和數(shù)據(jù)行匹配時具有更大的靈活性。 例如:
- 星形語法-用于使用名稱相似的源列來設(shè)置目標列值的簡寫。 例如:
自動和增量式 Presto/Athena 清單生成
正如 Query Delta Lake Tables From Presto and Athena, Improved Operations Concurrency,andMergePerformance 文章中所述,Delta Lake 支持其他處理引擎通過 manifest 文件來讀取 Delta Lake。manifest 文件包含清單生成時的最新版本。如上一章所述,您將需要:
- 生成 Delta Lake 清單文件
- 配置 Presto 或 Athena 讀取生成的清單
- 手動重新生成(更新)清單文件
Delta Lake 0.7.0的新增功能是使用以下命令自動更新清單文件:
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true )通過表屬性文件來配置表
通過使用 ALTER TABLE SET TBLPROPERTIES,您可以在表上設(shè)置表屬性,可以啟用,禁用或配置 Delta Lake 的許多功能,就像自動清單生成那樣。例如使用表屬性,您可以使用 delta.appendOnly=true 阻止 Delta 表中數(shù)據(jù)的刪除和更新。
您還可以通過以下屬性輕松控制 Delta Lake 表保留的歷史記錄:
- delta.logRetentionDuration:控制表的歷史記錄(即事務日志歷史記錄)保留的時間。默認情況下會保留30天的歷史記錄,但是您可能需要根據(jù)自己的要求(例如GDPR歷史記錄上下文)更改此值。 ?
- delta.deletedFileRetentionDuration:控制文件成為 VACUUM 的候選時必須在多久被刪除。默認情況下會刪除7天以上的數(shù)據(jù)文件。
從 Delta Lake 0.7.0 開始,您可以使用 ALTER TABLE SET TBLPROPERTIES 來配置這些屬性。
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(delta.logRetentionDuration = “interval “delta.deletedFileRetentionDuration = “interval “ )在 Delta Lake 表中提交支持添加用戶定義的元數(shù)據(jù)
您可以指定自定義的字符串來作為元數(shù)據(jù),通過 Delta Lake 表操作進行的提交,也可以使用DataFrameWriter選項userMetadata,或者 SparkSession 的配置spark.databricks.delta.commitInfo。 userMetadata。
在以下示例中,我們將根據(jù)每個用戶請求從數(shù)據(jù)湖中刪除一個用戶(1xsdf1)。為確保我們將用戶的請求與刪除相關(guān)聯(lián),我們還將 DELETE 請求 ID 添加到了 userMetadata中。
SET spark.databricks.delta.commitInfo.userMetadata={ “GDPR”:”DELETE Request 1x891jb23” }; DELETE FROM user_table WHERE user_id = ‘1xsdf1’當查看用戶表(user_table)的歷史記錄操作時,可以輕松地在事務日志中標識關(guān)聯(lián)的刪除請求。
其他亮點
Delta Lake 0.7.0 版本的其他亮點包括:
- 支持 Azure Data Lake Storage Gen2-Spark 3.0 已經(jīng)支持 Hadoop 3.2 庫,也被 Azure Data Lake Storage Gen2 支持。
- 改進了對流式一次觸發(fā)的支持-使用 Spark 3.0,我們確保一次觸發(fā)(Trigger.Once)在單個微批處理中處理 Delta Lake 表中的所有未完成數(shù)據(jù),即使使用 DataStreamReader 選項 maxFilesPerTriggers 速度受限。
在 AMA 期間,關(guān)于結(jié)構(gòu)化流和使用 trigger.once 的問題又很多。
有關(guān)更多信息,一些解釋此概念的有用資源包括:
- 每天運行一次流作業(yè),可節(jié)省10倍的成本
- 超越 Lambda:引入Delta架構(gòu):特別是成本與延遲的對比
后續(xù)
您已經(jīng)了解了 Delta Lake 及其特性,以及如何進行性能優(yōu)化,本系列還包括其他內(nèi)容:
- Delta Lake 技術(shù)系列-基礎(chǔ)和性能
- Delta Lake 技術(shù)系列-Lakehouse
- Delta Lake 技術(shù)系列-Streaming
- Delta Lake 技術(shù)系列-客戶用例(Use Case)
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的【详谈 Delta Lake 】系列技术专题 之 特性(Features)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 唯品会:在 Flink 容器化与平台化上
- 下一篇: Vite + React 组件开发实践