1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例
1.18.3.Flink Catalog介紹
1.18.3.1.引言
1.18.3.2.Catalog 定義
1.18.3.3.Catalog 的實現(xiàn)
1.18.3.4.Catalog 使用舉例
1.18.3.Flink Catalog介紹
1.18.3.1.引言
以下轉(zhuǎn)自:http://legendtkl.com/2020/07/26/flink-catalog/
這篇文章我們介紹了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大數(shù)據(jù)引擎的同學(xué)應(yīng)該都知道這兩個計算引擎都有一個共同的組件叫 Catalog。下面是 Flink 的 Catalog 的官方定義。
Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。 元數(shù)據(jù)可以是臨時的,例如臨時表、或者通過TableEnvironment注冊的 UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。 Catalog提供了一個統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從Table API和SQL查詢語句中來訪問。簡單來說,Catalog 就是元數(shù)據(jù)管理中心,其中元數(shù)據(jù)包括數(shù)據(jù)庫、表、表結(jié)構(gòu)等信息。
1.18.3.2.Catalog 定義
Flink 的 Catalog 相關(guān)代碼定義在 catalog.java 文件中,是一個 interface,如下。
/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/ @PublicEvolving public interface Catalog {... }既然是interface,我們來看一下支持的操作。
我們可以將這些接口做一個簡單的分類。
Database 相關(guān)操作
?getDefaultDataBase:獲取默認(rèn)的 database
?getDatabase:獲取特定的 database
?listDatabases:列出所有的 database
?databaseExists:判斷 database 是否存在
?createDatabases:創(chuàng)建 database
?dropDatabases:刪除 database
?alterDatabases:修改 database
Table 相關(guān)操作,一般都會有個參數(shù)是database
?listTables:列出所有的 table 和 view
?getTable:獲取指定的 table 或者 view
?tableExist:判斷 table 或者 view 是否存在
?dropTable:刪除 table 或者 view
?createTable:創(chuàng)建 table 或者 view
?renameTable:重命名 table 或者 view
?alterTable:修改 table 或者 view
View 相關(guān)操作,除了和 table 共用方法外,還有一個獨有的方法。
?listViews:列出所有的 view
Partition 相關(guān)操作,partition 是 table 的一個屬性,所以參數(shù)一般都會帶有 table 信息。
?listPartition:列出 table 的所有 partition
?getPartition:獲取指定的 partition
?partitionExist:判斷 parition 是否存在
?createPartition:創(chuàng)建 partition
?dropPartition:刪除 partition
?alterPartition:修改 parition
Function 相關(guān)操作,這里的 function 知道的是用戶自定義的 function,也就是 Udf。
?listFunctions:列出所有的 function
?getFunction:獲取指定的 func
?functionExist:判斷 function 是否存在
?dropFunction:刪除 function
?alterFunction:修改 function
1.18.3.3.Catalog 的實現(xiàn)
從上圖我們可以看到 Catalog 的最終實現(xiàn)有三個類:
?HiveCatalog:使用 Hive 的元數(shù)據(jù)來作為 Flink 的 HiveCatalog
?GenericInMemoryCatalog:使用內(nèi)存實現(xiàn) Catalog
?JdbcCatalog:使用其他支持 jdbc 協(xié)議的關(guān)系型數(shù)據(jù)庫來存儲元數(shù)據(jù)
?PostgresCatalog:使用 Postgres 數(shù)據(jù)庫來作為 Catalog 存儲元數(shù)據(jù)
1.18.3.4.Catalog 使用舉例
下面的示例是 Flink SQL 使用 Catalog 的示例。
TableEnvironment tableEnv = ...// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");// Create a catalog table tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");tableEnv.listTables(); // should return the tables in current catalog and database.下面是 api 的方式來使用 Catalog
import org.apache.flink.table.api.*; import org.apache.flink.table.catalog.*; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.descriptors.Kafka;TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));// Create a catalog table TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();catalog.createTable(new ObjectPath("mydb", "mytable"),new CatalogTableImpl(schema,new Kafka().version("0.11").....startFromEarlist().toProperties(),"my comment"),false);List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"1.18.3.5.自定義Catalog
Catalog 是可擴(kuò)展的,用戶可以通過實現(xiàn) Catalog 接口來開發(fā)自定義 Catalog。 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現(xiàn)自定義的 Catalog 之外,還需要為這個 Catalog 實現(xiàn)對應(yīng)的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。 這組屬性集將傳遞給發(fā)現(xiàn)服務(wù),在該服務(wù)中,服務(wù)會嘗試將屬性關(guān)聯(lián)到 CatalogFactory 并初始化相應(yīng)的 Catalog 實例。
1.18.3.6.總結(jié)
這篇文章寫的比較簡單,相當(dāng)于自己的學(xué)習(xí)筆記,下一篇文章我們比較一下Spark 的 Catalog實現(xiàn)。
總結(jié)
以上是生活随笔為你收集整理的1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 1.18.2.10 解释表:Table.
- 下一篇: 1.18.5.流式概念、动态表(Dyna