日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念

發(fā)布時(shí)間:2025/4/5 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?從flink的官方文檔,我們知道flink的編程模型分為四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實(shí)現(xiàn)。

?

?

?

其中,

flink dataset api使用及原理?介紹了DataSet Api?

flink DataStream API使用及原理介紹了DataStream?Api?

flink中的時(shí)間戳如何使用?---Watermark使用及原理?介紹了底層實(shí)現(xiàn)的基礎(chǔ)Watermark

flink window實(shí)例分析?介紹了window的概念及使用原理

Flink中的狀態(tài)與容錯(cuò)?介紹了State的概念及checkpoint,savepoint的容錯(cuò)機(jī)制

0. 基本概念:

0.1?TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要負(fù)責(zé):

1、在內(nèi)部目錄Catalog中注冊(cè)一個(gè)Table
  2、注冊(cè)一個(gè)外部目錄Catalog
  3、執(zhí)行SQL查詢
  4、注冊(cè)一個(gè)用戶自定義函數(shù)UDF
  5、將DataStream或者DataSet轉(zhuǎn)換成Table
  6、持有BatchTableEnvironment或者StreamTableEnvironment的引用 /*** The base class for batch and stream TableEnvironments.** <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is* responsible for:** <ul>* <li>Registering a Table in the internal catalog</li>* <li>Registering an external catalog</li>* <li>Executing SQL queries</li>* <li>Registering a user-defined scalar function. For the user-defined table and aggregate* function, use the StreamTableEnvironment or BatchTableEnvironment</li>* </ul>*/

?

0.2?Catalog

Catalog:所有對(duì)數(shù)據(jù)庫(kù)和表的元數(shù)據(jù)信息都存放再Flink CataLog內(nèi)部目錄結(jié)構(gòu)中,其存放了flink內(nèi)部所有與Table相關(guān)的元數(shù)據(jù)信息,包括表結(jié)構(gòu)信息/數(shù)據(jù)源信息等。

/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/

其結(jié)構(gòu)如下:

?0.3 TableSource

在使用Table API時(shí),可以將外部的數(shù)據(jù)源直接注冊(cè)成Table數(shù)據(jù)結(jié)構(gòu)。此結(jié)構(gòu)稱之為TableSource

/*** Defines an external table with the schema that is provided by {@link TableSource#getTableSchema}.** <p>The data of a {@link TableSource} is produced as a {@code DataSet} in case of a {@code BatchTableSource}* or as a {@code DataStream} in case of a {@code StreamTableSource}. The type of ths produced* {@code DataSet} or {@code DataStream} is specified by the {@link TableSource#getProducedDataType()} method.** <p>By default, the fields of the {@link TableSchema} are implicitly mapped by name to the fields of* the produced {@link DataType}. An explicit mapping can be defined by implementing the* {@link DefinedFieldMapping} interface.** @param <T> The return type of the {@link TableSource}.*/

0.4?TableSink

數(shù)據(jù)處理完成后需要將結(jié)果寫入外部存儲(chǔ)中,在Table API中有對(duì)應(yīng)的Sink模塊,此模塊為TableSink

/*** A {@link TableSink} specifies how to emit a table to an external* system or location.** <p>The interface is generic such that it can support different storage locations and formats.** @param <T> The return type of the {@link TableSink}.*/

0.5 Table Connector

在Flink1.6版本之后,為了能夠讓Table API通過(guò)配置化的方式連接外部系統(tǒng),且同時(shí)可以在sql client中使用,flink 提出了Table Connector的概念,主要目的時(shí)將Table Source和Table Sink的定義和使用分離。

通過(guò)Table Connector將不同內(nèi)建的Table Source和TableSink封裝,形成可以配置化的組件,在Table Api和Sql client能夠同時(shí)使用。

/*** Creates a table source and/or table sink from a descriptor.** <p>Descriptors allow for declaring the communication to external systems in an* implementation-agnostic way. The classpath is scanned for suitable table factories that match* the desired configuration.** <p>The following example shows how to read from a connector using a JSON format and* register a table source as "MyTable":** <pre>* {@code** tableEnv* .connect(* new ExternalSystemXYZ()* .version("0.11"))* .withFormat(* new Json()* .jsonSchema("{...}")* .failOnMissingField(false))* .withSchema(* new Schema()* .field("user-name", "VARCHAR").from("u_name")* .field("count", "DECIMAL")* .registerSource("MyTable");* }*</pre>** @param connectorDescriptor connector descriptor describing the external system*/TableDescriptor connect(ConnectorDescriptor connectorDescriptor);

?

?本篇主要聚焦于sql和Table Api。

?1.sql

1.1 基于DataSet api的sql

示例:

package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example that shows how the Batch SQL API is used in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Register a Table under a name* - Run a SQL query on the registered Table*/ public class WordCountSQL {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));// register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency");// run a SQL query on the Table and retrieve the result as a new TableTable table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");DataSet<WC> result = tEnv.toDataSet(table, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }

其中,BatchTableEnvironment

/*** The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works* with {@link DataSet}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataSet} to a {@link Table}</li>* <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataSet}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/

BatchTableSource

/** Defines an external batch table and provides access to its data.** @param <T> Type of the {@link DataSet} created by this {@link TableSource}.*/

BatchTableSink

/** Defines an external {@link TableSink} to emit a batch {@link Table}.** @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.*/

?

1.2 基于DataStream api的sql

示例代碼

package org.apache.flink.table.examples.java;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment;import java.util.Arrays;/*** Simple example for demonstrating the use of SQL on a Stream Table in Java.** <p>This example shows how to:* - Convert DataStreams to Tables* - Register a Table under a name* - Run a StreamSQL query on the registered Table**/ public class StreamSQLExample {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Order> orderA = env.fromCollection(Arrays.asList(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2)));DataStream<Order> orderB = env.fromCollection(Arrays.asList(new Order(2L, "pen", 3),new Order(2L, "rubber", 3),new Order(4L, "beer", 1)));// convert DataStream to TableTable tableA = tEnv.fromDataStream(orderA, "user, product, amount");// register DataStream as TabletEnv.registerDataStream("OrderB", orderB, "user, product, amount");// union the two tablesTable result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount < 2"); tEnv.toAppendStream(result, Order.class).print();env.execute();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO.*/public static class Order {public Long user;public String product;public int amount;public Order() {}public Order(Long user, String product, int amount) {this.user = user;this.product = product;this.amount = amount;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount=" + amount +'}';}} }

其中,StreamTableEnvironment

/*** The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with* {@link DataStream}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataStream} to a {@link Table}</li>* <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataStream}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/

?StreamTableSource

/** Defines an external stream table and provides read access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSource}.*/

StreamTableSink

/*** Defines an external stream table and provides write access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSink}.*/

?

2. table api

示例

package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example for demonstrating the use of the Table API for a Word Count in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Apply group, aggregate, select, and filter operations*/ public class WordCountTable {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));Table table = tEnv.fromDataSet(input);Table filtered = table.groupBy("word").select("word, frequency.sum as frequency").filter("frequency = 2");DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }

?3.數(shù)據(jù)轉(zhuǎn)換

  3.1 DataSet與Table相互轉(zhuǎn)換

    DataSet-->Table

      注冊(cè)方式:

  // register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency"); 
     轉(zhuǎn)換方式:
       Table table =?tEnv.fromDataSet(input);

    Table-->DataSet

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

  3.2 DataStream與Table相互轉(zhuǎn)換

    DataStream-->Table

      注冊(cè)方式:

  tEnv.registerDataStream("OrderB", orderB, "user, product, amount");      轉(zhuǎn)換方式:
       Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");

    Table-->DataStream

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

?

參考資料

【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html

【2】Flink原理、實(shí)戰(zhàn)與性能優(yōu)化

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/11161621.html

總結(jié)

以上是生活随笔為你收集整理的使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。