使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念
?從flink的官方文檔,我們知道flink的編程模型分為四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實(shí)現(xiàn)。
?
?
?
其中,
flink dataset api使用及原理?介紹了DataSet Api?
flink DataStream API使用及原理介紹了DataStream?Api?
flink中的時(shí)間戳如何使用?---Watermark使用及原理?介紹了底層實(shí)現(xiàn)的基礎(chǔ)Watermark
flink window實(shí)例分析?介紹了window的概念及使用原理
Flink中的狀態(tài)與容錯(cuò)?介紹了State的概念及checkpoint,savepoint的容錯(cuò)機(jī)制
0. 基本概念:
0.1?TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要負(fù)責(zé):
1、在內(nèi)部目錄Catalog中注冊(cè)一個(gè)Table2、注冊(cè)一個(gè)外部目錄Catalog
3、執(zhí)行SQL查詢
4、注冊(cè)一個(gè)用戶自定義函數(shù)UDF
5、將DataStream或者DataSet轉(zhuǎn)換成Table
6、持有BatchTableEnvironment或者StreamTableEnvironment的引用 /*** The base class for batch and stream TableEnvironments.** <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is* responsible for:** <ul>* <li>Registering a Table in the internal catalog</li>* <li>Registering an external catalog</li>* <li>Executing SQL queries</li>* <li>Registering a user-defined scalar function. For the user-defined table and aggregate* function, use the StreamTableEnvironment or BatchTableEnvironment</li>* </ul>*/
?
0.2?Catalog
Catalog:所有對(duì)數(shù)據(jù)庫(kù)和表的元數(shù)據(jù)信息都存放再Flink CataLog內(nèi)部目錄結(jié)構(gòu)中,其存放了flink內(nèi)部所有與Table相關(guān)的元數(shù)據(jù)信息,包括表結(jié)構(gòu)信息/數(shù)據(jù)源信息等。
/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/其結(jié)構(gòu)如下:
?0.3 TableSource
在使用Table API時(shí),可以將外部的數(shù)據(jù)源直接注冊(cè)成Table數(shù)據(jù)結(jié)構(gòu)。此結(jié)構(gòu)稱之為TableSource
/*** Defines an external table with the schema that is provided by {@link TableSource#getTableSchema}.** <p>The data of a {@link TableSource} is produced as a {@code DataSet} in case of a {@code BatchTableSource}* or as a {@code DataStream} in case of a {@code StreamTableSource}. The type of ths produced* {@code DataSet} or {@code DataStream} is specified by the {@link TableSource#getProducedDataType()} method.** <p>By default, the fields of the {@link TableSchema} are implicitly mapped by name to the fields of* the produced {@link DataType}. An explicit mapping can be defined by implementing the* {@link DefinedFieldMapping} interface.** @param <T> The return type of the {@link TableSource}.*/0.4?TableSink
數(shù)據(jù)處理完成后需要將結(jié)果寫入外部存儲(chǔ)中,在Table API中有對(duì)應(yīng)的Sink模塊,此模塊為TableSink
/*** A {@link TableSink} specifies how to emit a table to an external* system or location.** <p>The interface is generic such that it can support different storage locations and formats.** @param <T> The return type of the {@link TableSink}.*/0.5 Table Connector
在Flink1.6版本之后,為了能夠讓Table API通過(guò)配置化的方式連接外部系統(tǒng),且同時(shí)可以在sql client中使用,flink 提出了Table Connector的概念,主要目的時(shí)將Table Source和Table Sink的定義和使用分離。
通過(guò)Table Connector將不同內(nèi)建的Table Source和TableSink封裝,形成可以配置化的組件,在Table Api和Sql client能夠同時(shí)使用。
/*** Creates a table source and/or table sink from a descriptor.** <p>Descriptors allow for declaring the communication to external systems in an* implementation-agnostic way. The classpath is scanned for suitable table factories that match* the desired configuration.** <p>The following example shows how to read from a connector using a JSON format and* register a table source as "MyTable":** <pre>* {@code** tableEnv* .connect(* new ExternalSystemXYZ()* .version("0.11"))* .withFormat(* new Json()* .jsonSchema("{...}")* .failOnMissingField(false))* .withSchema(* new Schema()* .field("user-name", "VARCHAR").from("u_name")* .field("count", "DECIMAL")* .registerSource("MyTable");* }*</pre>** @param connectorDescriptor connector descriptor describing the external system*/TableDescriptor connect(ConnectorDescriptor connectorDescriptor);?
?本篇主要聚焦于sql和Table Api。
?1.sql
1.1 基于DataSet api的sql
示例:
package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example that shows how the Batch SQL API is used in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Register a Table under a name* - Run a SQL query on the registered Table*/ public class WordCountSQL {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));// register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency");// run a SQL query on the Table and retrieve the result as a new TableTable table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");DataSet<WC> result = tEnv.toDataSet(table, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }其中,BatchTableEnvironment
/*** The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works* with {@link DataSet}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataSet} to a {@link Table}</li>* <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataSet}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/BatchTableSource
/** Defines an external batch table and provides access to its data.** @param <T> Type of the {@link DataSet} created by this {@link TableSource}.*/BatchTableSink
/** Defines an external {@link TableSink} to emit a batch {@link Table}.** @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.*/?
1.2 基于DataStream api的sql
示例代碼
package org.apache.flink.table.examples.java;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment;import java.util.Arrays;/*** Simple example for demonstrating the use of SQL on a Stream Table in Java.** <p>This example shows how to:* - Convert DataStreams to Tables* - Register a Table under a name* - Run a StreamSQL query on the registered Table**/ public class StreamSQLExample {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Order> orderA = env.fromCollection(Arrays.asList(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2)));DataStream<Order> orderB = env.fromCollection(Arrays.asList(new Order(2L, "pen", 3),new Order(2L, "rubber", 3),new Order(4L, "beer", 1)));// convert DataStream to TableTable tableA = tEnv.fromDataStream(orderA, "user, product, amount");// register DataStream as TabletEnv.registerDataStream("OrderB", orderB, "user, product, amount");// union the two tablesTable result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount < 2"); tEnv.toAppendStream(result, Order.class).print();env.execute();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO.*/public static class Order {public Long user;public String product;public int amount;public Order() {}public Order(Long user, String product, int amount) {this.user = user;this.product = product;this.amount = amount;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount=" + amount +'}';}} }其中,StreamTableEnvironment
/*** The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with* {@link DataStream}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataStream} to a {@link Table}</li>* <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataStream}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/?StreamTableSource
/** Defines an external stream table and provides read access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSource}.*/StreamTableSink
/*** Defines an external stream table and provides write access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSink}.*/?
2. table api
示例
package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example for demonstrating the use of the Table API for a Word Count in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Apply group, aggregate, select, and filter operations*/ public class WordCountTable {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));Table table = tEnv.fromDataSet(input);Table filtered = table.groupBy("word").select("word, frequency.sum as frequency").filter("frequency = 2");DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }?3.數(shù)據(jù)轉(zhuǎn)換
3.1 DataSet與Table相互轉(zhuǎn)換
DataSet-->Table
注冊(cè)方式:
// register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency");轉(zhuǎn)換方式:
Table table =?tEnv.fromDataSet(input);
Table-->DataSet
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
3.2 DataStream與Table相互轉(zhuǎn)換
DataStream-->Table
注冊(cè)方式:
tEnv.registerDataStream("OrderB", orderB, "user, product, amount"); 轉(zhuǎn)換方式:Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
Table-->DataStream
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
?
參考資料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html
【2】Flink原理、實(shí)戰(zhàn)與性能優(yōu)化
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/11161621.html
總結(jié)
以上是生活随笔為你收集整理的使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Flink中的状态与容错
- 下一篇: 使用flink Table Sql ap