日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例

發(fā)布時間:2024/9/27 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.18.3.Flink Catalog介紹
1.18.3.1.引言
1.18.3.2.Catalog 定義
1.18.3.3.Catalog 的實現(xiàn)
1.18.3.4.Catalog 使用舉例

1.18.3.Flink Catalog介紹

1.18.3.1.引言

以下轉(zhuǎn)自:http://legendtkl.com/2020/07/26/flink-catalog/

這篇文章我們介紹了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大數(shù)據(jù)引擎的同學(xué)應(yīng)該都知道這兩個計算引擎都有一個共同的組件叫 Catalog。下面是 Flink 的 Catalog 的官方定義。

Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。 元數(shù)據(jù)可以是臨時的,例如臨時表、或者通過TableEnvironment注冊的 UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。 Catalog提供了一個統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從Table API和SQL查詢語句中來訪問。

簡單來說,Catalog 就是元數(shù)據(jù)管理中心,其中元數(shù)據(jù)包括數(shù)據(jù)庫、表、表結(jié)構(gòu)等信息。

1.18.3.2.Catalog 定義

Flink 的 Catalog 相關(guān)代碼定義在 catalog.java 文件中,是一個 interface,如下。

/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/ @PublicEvolving public interface Catalog {... }

既然是interface,我們來看一下支持的操作。

我們可以將這些接口做一個簡單的分類。

  • Database 相關(guān)操作
    ?getDefaultDataBase:獲取默認(rèn)的 database
    ?getDatabase:獲取特定的 database
    ?listDatabases:列出所有的 database
    ?databaseExists:判斷 database 是否存在
    ?createDatabases:創(chuàng)建 database
    ?dropDatabases:刪除 database
    ?alterDatabases:修改 database

  • Table 相關(guān)操作,一般都會有個參數(shù)是database
    ?listTables:列出所有的 table 和 view
    ?getTable:獲取指定的 table 或者 view
    ?tableExist:判斷 table 或者 view 是否存在
    ?dropTable:刪除 table 或者 view
    ?createTable:創(chuàng)建 table 或者 view
    ?renameTable:重命名 table 或者 view
    ?alterTable:修改 table 或者 view

  • View 相關(guān)操作,除了和 table 共用方法外,還有一個獨有的方法。
    ?listViews:列出所有的 view

  • Partition 相關(guān)操作,partition 是 table 的一個屬性,所以參數(shù)一般都會帶有 table 信息。
    ?listPartition:列出 table 的所有 partition
    ?getPartition:獲取指定的 partition
    ?partitionExist:判斷 parition 是否存在
    ?createPartition:創(chuàng)建 partition
    ?dropPartition:刪除 partition
    ?alterPartition:修改 parition

  • Function 相關(guān)操作,這里的 function 知道的是用戶自定義的 function,也就是 Udf。
    ?listFunctions:列出所有的 function
    ?getFunction:獲取指定的 func
    ?functionExist:判斷 function 是否存在
    ?dropFunction:刪除 function
    ?alterFunction:修改 function

  • 1.18.3.3.Catalog 的實現(xiàn)

    從上圖我們可以看到 Catalog 的最終實現(xiàn)有三個類:
    ?HiveCatalog:使用 Hive 的元數(shù)據(jù)來作為 Flink 的 HiveCatalog
    ?GenericInMemoryCatalog:使用內(nèi)存實現(xiàn) Catalog
    ?JdbcCatalog:使用其他支持 jdbc 協(xié)議的關(guān)系型數(shù)據(jù)庫來存儲元數(shù)據(jù)
    ?PostgresCatalog:使用 Postgres 數(shù)據(jù)庫來作為 Catalog 存儲元數(shù)據(jù)

    1.18.3.4.Catalog 使用舉例

    下面的示例是 Flink SQL 使用 Catalog 的示例。

    TableEnvironment tableEnv = ...// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");// Create a catalog table tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");tableEnv.listTables(); // should return the tables in current catalog and database.

    下面是 api 的方式來使用 Catalog

    import org.apache.flink.table.api.*; import org.apache.flink.table.catalog.*; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.descriptors.Kafka;TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));// Create a catalog table TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();catalog.createTable(new ObjectPath("mydb", "mytable"),new CatalogTableImpl(schema,new Kafka().version("0.11").....startFromEarlist().toProperties(),"my comment"),false);List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

    1.18.3.5.自定義Catalog

    Catalog 是可擴(kuò)展的,用戶可以通過實現(xiàn) Catalog 接口來開發(fā)自定義 Catalog。 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現(xiàn)自定義的 Catalog 之外,還需要為這個 Catalog 實現(xiàn)對應(yīng)的 CatalogFactory 接口。

    CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。 這組屬性集將傳遞給發(fā)現(xiàn)服務(wù),在該服務(wù)中,服務(wù)會嘗試將屬性關(guān)聯(lián)到 CatalogFactory 并初始化相應(yīng)的 Catalog 實例。

    1.18.3.6.總結(jié)

    這篇文章寫的比較簡單,相當(dāng)于自己的學(xué)習(xí)筆記,下一篇文章我們比較一下Spark 的 Catalog實現(xiàn)。

    總結(jié)

    以上是生活随笔為你收集整理的1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。