FlinkSQL实战开发
FlinkSQL實戰開發
1、基礎知識
FlinkSQL分為Table API和SQL API,是架構于Flink Core之上用SQL予以方便快捷地進行結構化數據處理的上層庫。
- 工作流程
SQL和Table在進入Flink以后轉化成統一的數據結構表達形式,也就是邏輯計劃(logic plan),其中catalog提供元數據信息,用于后續的優化,邏輯計劃是優化的入門,經過一系列規則后,Flink把初始的邏輯計劃優化為物理計劃(phy plan),物理計劃通過代碼構造器翻譯為Transformation,最后轉換為工作圖(job graph)。
整個過程沒有單獨的流處理和批處理,因為流處理和批處理優化過程和擴建都是共享的。
- 編程模型
創建Flink SQL運行環境。
將數據源定義成表。
執行SQL語義查詢。
將查詢結果輸出到目標表中。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.15.2</flink.version>
<scala.version>2.12.2</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink客戶端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--本地運行的webUI-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink與kafka整合-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!--狀態后端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志系統-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.21</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!--json格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--csv格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- flink與File整合的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
</dependencies>
- emp.txt數據
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":351619200000,"sal":1250.0,"comm":500.0,"deptno":30}
{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":354988800000,"sal":2975.0,"comm":null,"deptno":20}
{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":370454400000,"sal":1250.0,"comm":1400.0,"deptno":30}
{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":357494400000,"sal":2850.0,"comm":null,"deptno":30}
{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":360864000000,"sal":2450.0,"comm":null,"deptno":10}
{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":553100400000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":374774400000,"sal":5000.0,"comm":null,"deptno":10}
{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":368726400000,"sal":1500.0,"comm":0.0,"deptno":30}
{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":553100400000,"sal":1100.0,"comm":null,"deptno":20}
{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":376156800000,"sal":950.0,"comm":null,"deptno":30}
{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":376156800000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":380563200000,"sal":1300.0,"comm":null,"deptno":10}
- JAVA代碼
public static void main(String[] args) throws Exception {
//快速入門
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
//讀取文本文件數據轉為Table 對象
DataStream<Emp> source = environment.readTextFile("data/emp.txt")
.map(lines ->JSONObject.parseObject(lines, Emp.class));
//把JAVA對象轉為table對象
//注意Emp對象中hiredate時間戳是Long類型
// {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
Table table = tableEnv.fromDataStream(source);
table.select(Expressions.$("*")).execute().print();
}
2、Flink SQL編程概覽
- 運行環境
TableEnvironment是Table API和SQL的核心概念:
- 內部catalog中注冊Table
- 注冊外部的catalog
- 加載可插拔模式
- 執行SQL查詢
- 注冊自定義函數(scalar table aggregation)
- DataStream和Table之間的轉換
Table與特定的TableEnvironment綁定,不能在同一條查詢中使用不同的TableEnvironment中的表。
輸入源流式還是批式,Table API和SQL查詢都會轉換成DataStream程序。
Table對象的標識位:CataLog.DB.Table
- 創建方式一
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
- 創建方式二
EnvironmentSettings build = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment TabEnv = TableEnvironment.create(build);
- 創建表
標識符由三個部分組成:catalog 名稱、數據庫名稱以及對象名稱。
如果catalog或者數據庫沒有指明,就會使用當前默認值。
Table可以是虛擬的(視圖views)也可以是常規的表Tables,其中視圖是臨時的存儲在內存中,會話結束臨時表就消失,而tables表示永久化保存的外部數據物理表。
表分類:臨時表(僅存在flink會話中)永久表(元數據保存在catalog中)屏蔽特性(臨時表與永久表同名,臨時表存在永久表就無法訪問,刪除臨時表就可以訪問永久表)
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數據并設置別名。
tabEnv.createTemporaryView("t_emp",table);
tabEnv.sqlQuery("select * from t_emp").execute().print();
- DataStream轉Table對象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
//設置別名并查詢指定列數據
Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));
- createTemporaryView
創建臨時視圖(臨時表),第一個參數是注冊的表名([catalog.db.]tableName),第二個參數可以是Tabe對象也可以是DataStream對象,第三個參數是指定的列字段名(可選)。
Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數據并設置別名。
tabEnv.createTemporaryView("t_emp",table);
=========================================================================================
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
//設置別名 并指定查詢的列數據
tabEnv.createTemporaryView("t_emp",source,$("deptno").as("dd"));
tabEnv.sqlQuery("select * from t_emp").execute().print();
- 數據類型
- 原子類型:DataStream中支持的數據類型,Table也是支持的,也就是基本數據類和通用類型(Integer、Double、String等)
- Tuple類型:從f0開始計數,f0 f1 f2,所有字段都可以被重新排序,也可以提前一部分字段。
- Pojo類型:Flink 也支持多種數據類型組合成的“復合類型”,最典型的就是簡單 Java 對象(POJO 類型)。將 POJO 類型的 DataStream 轉換成 Table,如果不指定字段名稱,就會直接使用原始 POJO 類型 中的字段名稱。Pojo字段可以被重新排序、提取和重命名。
- Row類型:Flink 中還定義了一個在關系型表中更加通用的數據類型——行(Row),它是 Table 中數據的基 本組織形式。長度固定,無法推斷出每個字段的類型,在使用時必須聲明具體的類型信息。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.readTextFile("data/dept.txt");
//所謂的字段重新排序就是查詢出來的指定字段順序可以自定義
StreamTableEnvironment.create(environment).fromDataStream(source,$("f1")).execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
DataStreamSource<Row> source = environment.fromElements(Row.ofKind(RowKind.INSERT, "張三", 20)
, Row.ofKind(RowKind.INSERT, "李四", 25)
//RowKind.UPDATE_BEFORE 打標記的作用
, Row.ofKind(RowKind.UPDATE_BEFORE, "yy", 12)
, Row.ofKind(RowKind.UPDATE_AFTER, "aaa", 18));
Table table = tabEnv.fromChangelogStream(source);
table.execute().print();
- 查詢表
Table API 是關于 Scala 和 Java 的集成語言式查詢 API。與 SQL 相反,Table API 的查詢不是由字符串指定,而是在宿主語言中逐步構建。
table.groupBy(...).select() ,其中 groupBy(...) 指定 table 的分組,而 select(...) 在 table 分組上的投影
//{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
Table table = StreamTableEnvironment.create(environment).fromDataStream(source);
table.where($("deptno").isEqual(10)).select($("ename"), $("job")).execute().print();
table.groupBy($("deptno")).select($("deptno"),$("sal").avg().as("sal_avg")).execute().print();
- SQL語法
StreamTableEnvironment對象有兩個常用的方法:sqlQuery()和executeSql()兩個方法。
- sqlQuery()主要用于查詢數據,并且可以查詢混用。
- executeSql()可以用來增刪改查數據都可以。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tbl = StreamTableEnvironment.create(environment);
tbl.createTemporaryView("t_emp_demo",source);
String sql="select deptno,avg(sal) " +
" from t_emp_demo " +
" group by deptno ";
tbl.executeSql(sql).print();
=========================================================================================
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
Table empTable = tableEnvironment.fromDataStream(source);
tableEnvironment.sqlQuery("select * from "+empTable).execute().print();
- 輸出表
insertInto:Table通過寫入TableSink輸出。TableSink是一個通用接口,包括:
- 用于支持多種文件格式(如CSV、Apache Parquest、Apache Avro)
- 存儲系統(如JDBC、Apache Hbase、Apache Cassandra、Es)
- 消息隊列系統(如Apache kafka、Rabbit MQ)
- 控制臺寫入并輸出
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.createTemporaryView("t_emp_d",source);
Table tableSource = tblEnv.fromDataStream(source, $("empno"), $("ename"), $("job"));
String sql=
"create table t_emp_r(" +
"empno Integer," +
"ename String," +
"job String) " +
"with ( " +
"'connector'='print')";
tblEnv.executeSql(sql);
tableSource.insertInto("t_emp_r").execute();
//t_emp_r 不能當做表進行查詢 只能當做sink端
// tblEnv.executeSql("select * from t_emp_r").print();
environment.execute();
3、Flink SQL連接器
- kafka寫入
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlSource="create table kafka_source( " +
"deptno int," +
"dname String," +
"loc String)" +
"with (" +
"'connector'='kafka'," +
"'topic'='flink_kafka_source'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='flink-zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='csv')";
tblEnv.executeSql(sqlSource);
String sqlSink="create table kafka_sink( " +
"deptno int," +
"dname String," +
"loc String)" +
"with (" +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='flink-zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlSink);
//從一張表查詢數據插入到另外一張表中
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").execute();
- 查看執行計劃
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").printExplain();
- 表對象轉換為流對象
將一個Table對象轉換成DataStream,直接調用表環境中國的ToDataStream();
tableEnv.toDataStream(table).print();
- toChangelogStream
對于有更新操作的表,我們不要視圖直接把它轉換成DataStream打印,而是記錄一下它的更新日志(change log)。
對于表的更新操作的表,就變成了一條更新日志的流,可轉換成流打印輸出。
規則:Insert插入操作編碼是add消息。Delete刪除操作編碼為retract消息 update更新操作則為編碼更改行的retract消息和更新后行的add消息。
tableEnv.toChangelogStream(table).print();
- JDBC連接
Flink 支持連接到多個使用方言(dialect)的數據庫,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測試目的。下表列出了從關系數據庫數據類型到 Flink SQL 數據類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡單。
- 常見的數據類型映射
- 依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String jdbcSQL=
"create table jdbc_scott_emp(" +
"empno int," +
"ename string," +
"job string," +
"mgr int," +
"hiredate date," +
"sal double," +
"comm double," +
"deptno int)" +
"with (" +
"'connector'='jdbc'," +
"'url'='jdbc:mysql://master:3306/scott?serverTimeZone=Asia/Shanghai'," +
"'table-name'='emp'," +
"'driver'='com.mysql.cj.jdbc.Driver'," +
"'username'='root'," +
"'password'='Root@123456.')";
tblEnv.executeSql(jdbcSQL);
tblEnv.sqlQuery("select * from jdbc_scott_emp").execute().print();
- SQL語句(jdbc數據插入操作、時態關聯創建維表)
-- 從另一張表 "T" 將數據寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- JDBC 表在時態表關聯中作為維表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
- DataGen SQL連接器
用于生成模擬數據,DataGen 連接器允許按數據生成規則進行讀取。
不支持復雜類型: Array,Map,Row。 請用計算列構造這些類型。
連接器參數
- 案例
//按照一定規則隨機生成數據
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String SqlStr="CREATE TABLE datagen (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
" 'fields.f_random_str.length'='10'\n" +
")";
tblEnv.executeSql(SqlStr);
tblEnv.sqlQuery("select * from datagen").execute().print();
- Upsert Kafka SQL連接器
由于flink是流式計算,會出現相同的key值數據寫入,在寫入kafka中,同一個key生成的value值會不斷被更新(update
-u u+標記),如果沒有重復的key則被插入(insert+i標記),如果value為空值就會被標記刪除(delete+d標記)。作為 sink,upsert-kafka 連接器可以消費 changelog 流。它會將 INSERT/UPDATE_AFTER 數據作為正常的 Kafka 消息寫入,并將 DELETE 數據以 value 為空的 Kafka 消息寫入(表示對應 key 的消息被刪除)。Flink 將根據主鍵列的值對數據進行分區,從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區中。
- 案例
//使用datagen模擬數據
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String dataGen=
"create table t_dataGen(" +
"deptno int," +
"salnum int," +
"ts AS localtimestamp," +
"WATERMARK FOR ts AS ts" +
") with ( " +
"'connector'='datagen'," +
"'rows-per-second'='2'," +
"'fields.deptno.min'='88'," +
"'fields.deptno.max'='99'," +
"'fields.salnum.min'='10'," +
"'fields.salnum.max'='20')";
tblEnv.executeSql(dataGen);
// tblEnv.sqlQuery("select deptno,sum(salnum) as salnum from t_dataGen group by deptno").execute().print();
//kafka sink端
String kafkaSink="create table upsert_kafka_num(" +
"deptno int," +
"salnum int," +
"primary key(deptno) not enforced)" +
"with(" +
"'connector'='upsert-kafka'," +
"'topic'='upsert_kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'key.format'='csv'," +
"'value.format'='json')";
tblEnv.executeSql(kafkaSink);
//插入數據
tblEnv.executeSql("insert into upsert_kafka_num select deptno,sum(salnum) as salnum from t_dataGen group by deptno");
- FileSystem連接器
文件系統分為:本地文件系統、外部文件系統。
本地文件系統:ink 原生支持本地機器上的文件系統,包括任何掛載到本地文件系統的 NFS 或 SAN 驅動器,默認即可使用,無需額外配置。本地文件可通過 file:// URI Scheme 引用。
外部文件系統:常見的有HDFS、clickhouse、HBase,上述文件系統可以并且需要作為插件使用。
使用外部文件系統時,在啟動 Flink 之前需將對應的 JAR 文件從
opt目錄復制到 Flink 發行版plugin目錄下的某一文件夾中。
- 本地文件測試
public static void main(String[] args) {
//設置環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlDemo="create table t_dept_d(" +
"deptno int," +
"dname string," +
"loc string)" +
"with(" +
"'connector'='filesystem'," +
"'path'='data/dept.txt'," +
"'format'='csv'" +
")";
tblEnv.executeSql(sqlDemo);
tblEnv.sqlQuery("select * from t_dept_d").execute().print();
}
- HDFS分布文件系統測試
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<!--加載一些其他配置文件 比如core-site.xml dfs-core.xml yarn-site.xml等配置文件進resource目錄-->
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String hdfsSql="create table dfs_dept(" +
"deptno int," +
"dname string," +
"loc string)" +
"with (" +
"'connector'='filesystem'," +
"'path'='hdfs://hdfs-zwf/dept.txt'," +
"'format'='csv')";
tblEnv.executeSql(hdfsSql);
tblEnv.sqlQuery("select * from dfs_dept").execute().print();
}
4、Schema結構
- Pythsical column
物理字段:源自于外部存儲系統本身schema中的字段
- kafka消息的key、value中的字段
- mysql表中的字段
- hive表中的字段
- parquet文件中的字段
- computed column
表達式字段:在物理字段上施加一個sql表達式,并將表達式結果定義為一個字段.
// 第一種sqlAPI
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlStr="create table upsert_info(" +
"deptno int," +
"salnum2 as salnum*100,"+ //計算列
"salnum int)" +
"with (" +
"'connector'='kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='upsert_kafka'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
//第二種方式 TableAPI
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
.column("salnum",DataTypes.INT())
.columnByExpression("salpluns","salnum*100")
.build()).option("connector","kafka")
.option("topic","upsert_kafka")
.option("scan.startup.mode","earliest-offset")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.format("json").build());
tblEnv.sqlQuery("select * from kafka_dept").execute().print();
- metadata column
元數據字段:來源于connector從外部存儲系統中獲取到外部系統元信息。
kafka消息,通常意義上的數據內容是在record的key和value中,但是kafka還會攜帶所屬partition、offset、timestamp等元信息。而flink的連接器可以獲取并暴露這些元信息,允許用戶將信息定義成flinksql表中的字段。
//第一種sqlAPi
String sqlStr="create table upsert_info(" +
"deptno int," +
"salnum2 as salnum*100," + //計算列
"event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列
"salnum int)" +
"with (" +
"'connector'='kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='upsert_kafka'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
//第二種方式 TableAPI
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
.option("topic","upsert_kafka")
.option("scan.startup.mode","earliest-offset")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.format("json").build());
tblEnv.sqlQuery("select * from kafka_dept").execute().print();
- 主鍵約束
單字段主鍵約束語法:
// SQL API
id INT PRIMARY KEY NOT ENFORCED,
name STRING
// Table Api
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
//設置主鍵字段 primary key
.primaryKey("deptno")
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
多字段主鍵約束語法:
-- SQL API
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED
//Table API
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
//設置主鍵字段 primary key
.primaryKey("deptno","event_time")
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
//第一種sqlAPi
String sqlStr="create table upsert_info(" +
"deptno int," +//計算列
"event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列
"dname string," +
"loc string," +
"primary key(deptno,loc) not enforced)" +
"with (" +
"'connector'='upsert-kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='flink_kafka_source'," +
"'key.format'='csv'," +
"'value.format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
注意的是:kafka連接器模式下不能設置主鍵,但是upsert-kafka連接器模式必須設置主鍵!主鍵字段不能有空值
在upsert-kafka模式下,key和value值不能為空,否則在csv模式中會解析失敗!
5、FlinkSQL Format
connector 連接器:對接外部存儲時, 根據外部存儲中的數據格式不同, 需要用到不同的 format 組件;
format 組件:作用就是告訴連接器, 如何解析外部存儲中的數據及映射到表 schema;
使用基本步驟:
- 導入format組件的jar依賴
- 指導format組件名稱
- 設置format組件所需的參數
- FlinkSQL支持的Format
- 案例
<!--json格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--csv格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
- 案例
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
6、Flink WaterMark
動態表 是 Flink 的支持流數據的 Table API 和 SQL 的核心概念。與表示批處理數據的靜態表不同,動態表是隨時間變化的。可以像查詢靜態批處理表一樣查詢它們。查詢動態表將生成一個連續查詢(Continuous Query) 。一個連續查詢永遠不會終止,結果會生成一個動態表。查詢不斷更 新其(動態)結果表,以反映其(動態)輸入表上的更改。本質上,動態表上的連續查詢非常類似于定 義物化視圖的查詢。
需要注意的是,連續查詢的結果在語義上總是等價于以批處理模式在輸入表快照上執行的相同查詢的結果。
與spark、hive組件中的表最大不同之處在于flink SQL中的表是動態表。flink核心就是對有界或者*的數據流處理,是流式持續處理的過程。
- 連續查詢
在動態表上計算一個連續查詢,生成一個新的動態表。與批處理查詢不同,連續查詢從不終止,根據其輸入表上的更新其結果表。在任何時候,連續查詢的結果在語義上與批處理模式在輸入表快照上執行相同查詢的結果相同。
- 事件時間
創建表的DDL,增加一個字段,通過watermark語句來定義事件時間屬性。
WATERMARK 語句主要用來定義水位線(watermark)的生成表達式,這個表達式會將帶有事件 時間戳的字段標記為事件時間屬性,并在它基礎上給出水位線的延遲時間。
//水位線 設置延遲時間5s
String eventTime="create table proc_dept_tbl(" +
"deptno int," +
"dname string," +
"loc string," +
"ts timestamp_ltz(3) metadata from 'timestamp'," +
"watermark for ts as ts-interval '5' second" + // pt是事件處理
")with( " +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(eventTime);
tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();
//Table API
tblEnv.createTable("t_water_mark", TableDescriptor.forConnector("kafka")
.option("topic","flink_kafka_sink")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.option("properties.group.id","zwf")
.option("scan.startup.mode","earliest-offset")
.format("json")
.schema(Schema.newBuilder()
.column("deptno",DataTypes.INT())
.column("dname",DataTypes.STRING())
.column("loc",DataTypes.STRING())
.columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
.watermark("ts","ts-interval '5' second").build()).build());
tblEnv.sqlQuery("select deptno,dname,ts from t_water_mark").execute().print();
- 處理時間
定義處理時間屬性時,必須要額外聲明一個字段,專門用來保存當前的處理時間
在創建表的 DDL(CREATE TABLE 語句)中,可以增加一個額外的字段,通過調用系統內置的 PROCTIME()函數來指定當前的處理時間屬性,返回的類型是 TIMESTAMP_LTZ
- 案例
//Flink SQL 水位線 處理時間
String procTime="create table proc_dept_tbl(" +
"deptno int," +
"dname string," +
"loc string," +
"pt as proctime()" + // pt是事件處理
")with( " +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(procTime);
tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();
//使用TableApi執行
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
Table table = tblEnv.fromDataStream(source, Schema.newBuilder()
.column("empno",DataTypes.INT())
.column("ename", DataTypes.STRING())
.column("job",DataTypes.STRING())
.column("mgr",DataTypes.INT())
.column("hiredate",DataTypes.BIGINT())
.column("sal",DataTypes.DOUBLE())
.column("comm",DataTypes.DOUBLE())
.column("deptno",DataTypes.INT())
.columnByExpression("ts","proctime()")
.build());
tblEnv.sqlQuery("select empno,ename,ts from"+table.toString()).execute().print();
- DataStream定義時間
處理時間屬性同樣可以在將DataStream轉換為表的時候來定義。我們調用fromDataStream()方法 創建表時,可以用.proctime()后綴來指定處理時間屬性字段。
由于處理時間是系統時間,原始數據中并沒有這個字段,所以處理時間屬性一定不能定義在一個已 有字段上,只能定義在表結構所有字段的最后,作為額外的邏輯字段出現。
//快速入門
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
//讀取文本文件數據轉為Table 對象
DataStream<Emp> source = environment.readTextFile("data/emp.txt")
.map(lines ->JSONObject.parseObject(lines, Emp.class));
//把JAVA對象轉為table對象
//注意Emp對象中hiredate時間戳是Long類型
// {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
Table table = tableEnv.fromDataStream(source,$("empno"),$("ename"),$("ts").proctime());
table.select($("*")).execute().print();
7、FlinkSQL 窗口TVF
- TVF窗口化表值函數
- 目前flink提供了以下幾個窗口:
- 滑動窗口
- 滾動窗口
- 累積窗口
- 會話窗口
- 窗口TVF的返回值中,除去原始表中的所有列,增加描述窗口的額外3個列:
? 窗口起始點:窗口開始起始時間
? 窗口結束點:窗口結束時間
? 窗口時間:窗口結束時間-1
滾動窗口在DataStream API中的定義完全一樣,是長度固定、時間對齊、無重疊的窗口,一般用于周期性的統計計算。
Tumble(table data,timecol,size[,offset])函數三個必需參數:
data:表參數,此表需要包含一個時間屬性列。
timecol:一個描述符,指示數據的哪個時間屬性列應該映射到滾動的窗口。
size:指定滾動窗口的大小。
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
//執行SQL 隨機生成gid和sales gid隨機值取10到20 sales隨機值取1到9
//ts 使用本地時間 水位線是本地時間延遲5s
tblEnv.executeSql("CREATE TABLE t_goods (\n" +
" gid INT,\n" +
" sales INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.gid.min'='10',\n" +
" 'fields.gid.max'='20',\n" +
" 'fields.sales.min'='1',\n" +
" 'fields.sales.max'='9'\n" +
")");
// tblEnv.sqlQuery("select * from t_goods").execute().print();
//使用滾動窗口 5s滾動計算一次
// String tumbleWin="select * from table(tumble(table t_goods,descriptor(ts),interval '5' second))";
// tblEnv.sqlQuery(tumbleWin).execute().print();
//每個時間窗口中每個guid中總銷售額信息
tblEnv.sqlQuery(
"select window_start,window_end,gid,sum(sales) as sum_sales " +
"from table(tumble(table t_goods,descriptor(ts),interval '5' second))" +
"group by window_start,window_end,gid"
).execute().print();
- 滑動窗口
Hopping windows也稱為"sliding windows"
HOP函數分配的窗口覆蓋大小間隔內的行,并根據時間屬性性列移動每個窗口
HOP函數有三個必需的參數:HOP(Table data,slide,size[,offset])
- data:表格值,帶有時間戳字段的表格。
- slide:指定順序hopping窗口開始之間的持續時間。
- size:指定hopping窗口寬度的持續時間,size必須是slide的整數倍。
- 案例
//滑動窗口表值函數 窗口表值函數
//隨機生成gid大小是10到20 sales大小是1到10
String datagen="create table t_datagen(" +
"gid int," +
"sales int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='10'," +
"'fields.gid.min'='10'," +
"'fields.gid.max'='20'," +
"'fields.sales.min'='1'," +
"'fields.sales.max'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(datagen);
// tblEnv.sqlQuery("select * from t_datagen").execute().print();
//窗口大小是15s 滑動3s
tblEnv.sqlQuery("select gid,sum(sales),window_start,window_end from table(hop(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
- 累積窗口
CUMULATE函數將元素分配給覆蓋在初始步長間隔內的行,并將每一步擴展為多一個步長(保持 window start固定),直到最大窗口大小。
可以把cumulative函數看作應用TUMBLE窗口,首先使用最大窗口大小,然后將每個滾動窗口分 割成幾個具有相同窗口開始和窗口結束步長差異的窗口。
因此,累積窗口確實是重疊的,而且沒有固定的大小。
cumulate函數有三個必須的參數:
cumulate(table data,descriptor(timecol),step,size)——必須參數有以下:
- data:表格參數,表格必須包含一個時間屬性列
- timecol:時間屬性字段,也就是使用那個時間。
- step:連續累積窗口結束之間增加的窗口大小的持續時間。
- size:累積窗口的最大寬度的持續時間。大小必須是步長的整數倍。
- 案例
//累加窗口大小時間
//滑動窗口表值函數 窗口表值函數
//隨機生成gid大小是10到20 sales大小是1到10
String datagen="create table t_datagen(" +
"gid int," +
"sales int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='10'," +
"'fields.gid.min'='10'," +
"'fields.gid.max'='20'," +
"'fields.sales.min'='1'," +
"'fields.sales.max'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(datagen);
//每3s計算一次 并進行累加 比如19:45-19:48:10 19:48-19:51:20=>19:45-19:51:30
tblEnv.sqlQuery("select window_start,window_end,gid,sum(sales) as sales_sum from table(cumulate(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
- 分組去重
group+distinct:表示分組+去重,在用于uv統計時就需要!
- 案例
//用于網站統計 uv 用戶訪問數 pv 頁面訪問數
String websiteSQL="create table wbSiteNum(" +
"gid int," +
"url string," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
")with(" +
"'connector'='datagen'," +
"'fields.gid.min'='1000'," +
"'fields.gid.max'='2000'," +
"'fields.url.length'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(websiteSQL);
// tblEnv.sqlQuery("select * from wbSiteNum").execute().print();
tblEnv.sqlQuery(
"select count(distinct gid) as uv,count(url) as pv\n" +
"from wbSiteNum"
).execute().print();
8、FlinkSQL聚合函數
- 分組聚合
在SQL中一般所說的聚合,通過一些內置的函數來實現,比如SUM、MAX、MIN、AVG、以及count。
它得特點是對多條輸入數據進行計算,得到一個唯一的值,屬于多對一的轉換。比如我們可以通過下面的代碼計算輸入數據的個數。更多時候,我們通過group by子句指定分組的鍵,從而對數據按照某個字段做一個分組統計。
- 案例
//分組求和
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY pid;").execute().print();
- rollup
維度的上卷,字段維度從細粒度上轉變粗粒度!
//分組求和 rollup(pid,cid,xid) 維度從粗粒度到細粒度 pid->cid->xid
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY rollup(pid,cid,xid)").execute().print();
- cube
所有維度分組顯示,也就是正方體原則!比如(col1,col2,col3)2^3個維度表示。
tableEnvironment.sqlQuery("SELECT pid, cid, xid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid, num)\n" +
"GROUP BY CUBE(pid, cid, xid)").execute().print();
- grouping Sets
自定義維度分組,以下案例
(pid, cid, xid),(pid, cid),(pid), ()自定義四個維度分組。
//自定義維度分組
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY GROUPING SETS ((pid, cid, xid),(pid, cid),(pid), ())").execute().print();
9、開窗函數
比如說,我們可以以每一行數據為基準,計算它之前 1 小時內所有數據的平均值;也可以計算它 之前 10 個數的平均值。 就好像是在每一行上打開了一扇窗戶、收集數據進行統計一樣,這就是所謂的“開窗函數”。
分組聚合、窗口 TVF聚合都是“多對一”的關系,將數據分組之后每組只會得到一個聚合結果;
而開窗函數是對每行都要做一次開窗聚合,因此聚合之后表中的行數不會有任何減少,是一 個“多對多”的關系.
- 基本語法
SELECT
<聚合函數> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <時間屬性字段> <開窗范圍>)
, ...
FROM ...
- over():關鍵字前面是一個聚合函數,它會應用在后面over定義的窗口上,有如下參數:
? 1、partition by(可選)
? 用來指定分區的鍵,類似于group by的分組,這部分是可選的。
? 2、order by (必選)
? OVER 窗口是基于當前行擴展出的一段數據范圍,選擇的標準可以 基于時間也可以基于數量 。
? 在 Flink 的流處理中,目前只支持按照時間屬性的升序排列,所以這里 ORDER BY 后面 的字段必須是定義好的時間屬性
? 開窗范圍:
? 1、對于開窗函數而言,還有一個必須要指定的就是開窗的范圍,也就是到底要擴展多少行來做聚合。
? 2、這個范圍是由between<下界>and<上界>來定義,也就是"從下界到上界"的范圍。
? 3、目前上界只能是current row,也就是定義一個”從之前某一行到當前行“的范圍。
? 4、開窗選擇的范圍可以基于時間,也可以基于數據的數量。所以開窗范圍還應該在兩種模式之間做出選擇:
- 行間隔(rows intervals )
- 行間隔以rows為前綴,就是直接確定要選多少行,由當前行出發向前選取多少行。
- 例如開窗函數選擇當前行之前的5行數據:ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
-
范圍間隔(range intervals 以時間劃分范圍)
- 范圍間隔:范圍間隔以
range為前綴,就是基于order by指定時間字段去選擇一個范圍,一般就是當前行時間戳之前的一段時間。 - 例如:開窗范圍選擇當前行之前1小時的數據:RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
- 范圍間隔:范圍間隔以
-
案例
//執行環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執行SQL
tableEnvironment.executeSql("CREATE TABLE t_goods (\n" +
" gid STRING,\n" +
" type INT,\n" +
" price INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.gid.length'='10',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='5',\n" +
" 'fields.price.min'='1',\n" +
" 'fields.price.max'='9'\n" +
")");
//截止當前 前10s的每個類型的平均價格
tableEnvironment.sqlQuery(
"select tg.*,avg(price) over(partition by type order by ts range between interval '10' second preceding and current row) as price_avg\n" +
"from t_goods tg"
).execute().print();
//截止當前 前10行的每個類型商品的平均價格
tableEnvironment.sqlQuery(
"select tg.*,avg(price) over(partition by type order by ts rows between 10 preceding and current row) as price_avg\n" +
"from t_goods tg"
).execute().print();
- TopN
在 Flink SQL 中,是通過 OVER 聚合和一個條件篩選來實現TopN的。
利用row_number()函數為每一行數據聚合得到一個排序之后的行號,行號為row_num,并在外層的查詢中以row_num<=N作為條件進行篩選,就可以得到根據排序字段統計的topN結果了。
FlinkSQL專門用over聚合做了優化實現,只有在topN的應用場景中,over窗口oder by后才可以指定其他排序字段,要實現top N要嚴格按照上面格式定義,否則FlinkSQL優化器將無法正常解析。而且目前TableApi不支持row_number()函數,只有SQL API實現TopN方式。
SELECT ... FROM ( SELECT ...,
ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...)
WHERE row_num <= N [AND <其它條件>]
- 案例
//窗口排序
String dataGenDemo="create table t_datagen(" +
"gid string," +
"price int," +
"type int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '10' second" +
")with(" +
"'connector'='datagen'," +
"'fields.gid.length'='10'," +
"'rows-per-second'='10'," +
"'fields.price.min'='100'," +
"'fields.price.max'='999'," +
"'fields.type.min'='1'," +
"'fields.type.max'='1')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(dataGenDemo);
// tblEnv.sqlQuery("select * from t_datagen").execute().print();
String topNStr="select * from\n" +
"(select d.*,row_number() over(partition by type order by price desc) as row_num from\n"+
"t_datagen d) where row_num<=3";
tblEnv.sqlQuery(topNStr).execute().print();
=========================================================================================
//滾動窗口每5s滾動一次 每種類型排名前3的商品信息
String topNWin=" select *\n" +
" from(\n" +
" select *,row_number() over(partition by type order by price desc) as row_num\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '5' second))\n" +
" ) where row_num<=3";
tblEnv.sqlQuery(topNWin).execute().print();
=========================================================================================
- 窗口TopN
//查詢10秒內 每個窗口銷售總額最高的前三名的種類
String topNWinSql="select * " +
" from(select type,t_price,window_start,window_end,row_number() over(partition by window_start,window_end order by t_price desc) as row_num\n" +
" from (\n" +
" select type,window_start,window_end,sum(price) as t_price\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
" group by type,window_start,window_end\n" +
" ))where row_num<=3";
tblEnv.sqlQuery(topNWinSql).execute().print();
//查詢10秒內 每個種類中銷售總額最高的前三名的商品
String topNWinSql="select * " +
" from (select gid,type,window_start,window_end,row_number() over(partition by window_start,window_end,type,gid order by price desc) as row_num\n" +
" from (\n" +
" select *\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
" ) )" +
"where row_num<=3";
tblEnv.sqlQuery(topNWinSql).execute().print();
10、Join窗口聯結
與標準SQL一致,Flink SQL的常規聯結分為內聯結(inner join)和外聯結(outer join),區別在于結果中是否包含不符合聯結條件的行。目前僅支持等值條件作為聯結條件,也就是關鍵字ON后面必須是判斷兩表中字段相等的邏輯表達式。
- 等值內聯結,會返回兩表中符合聯接條件的所有行組合(動態表關聯)
//生成兩股數據流
String dataStr="create table dataGen_demo(" +
"gid string," +
"type int," +
"price int," +
"ts1 as localtimestamp," +
"watermark for ts1 as ts1-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.gid.length'='10'," +
"'fields.type.min'='1'," +
"'fields.type.max'='30'," +
"'fields.price.min'='100'," +
"'fields.price.max'='999')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(dataStr);
// tblEnv.sqlQuery("select * from dataGen_demo").execute().print();
String dataStr1="create table dataGen_demo1(" +
"type int," +
"tname string," +
"price int," +
"ts2 as localtimestamp," +
"watermark for ts2 as ts2-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.tname.length'='10'," +
"'fields.type.kind'='sequence'," +
"'fields.type.start'='1'," +
"'fields.type.end'='50'," +
"'fields.price.min'='300'," +
"'fields.price.max'='400')";
tblEnv.executeSql(dataStr1);
tblEnv.sqlQuery("select * from dataGen_demo inner join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
- 等值外聯結
left join: 左外連接 ,左表數據全部顯示,在內存等待數據匹配,匹配后刪除原來未匹配的數據重新顯示。
right join: 右外連接,右表數據全部顯示,在內存等待數據匹配,匹配后刪除原來未匹配的數據重新顯示。
full join:不管數據是否匹配,左右表的數據全部顯示,不管哪個表在內存中匹配到數據都先刪除未匹配的數據,重新顯示已經匹配的數據。
tblEnv.sqlQuery("select * from dataGen_demo left join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
tblEnv.sqlQuery("select * from dataGen_demo full join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
- 時間間隔聯接查詢
兩條流的join對應著SQL中兩個表的join,是流處理中特有的聯結方式。
目前 Flink SQL 還不支持窗口聯結,而間隔聯結則已經實現,這里除了符合約束條件的兩條中數據的笛卡爾積,多了一個時間間隔的限制。
具體語法:間隔聯結不需要用join關鍵字,直接在from后將聯結兩表列出來的就可以,用逗號分割。聯結條件用where子句來定義,用一個等值表達式描述。交叉聯結之后用where進行條件篩選,效果跟內聯結inner join... on ... 非常類似,我們可以在where子句中,聯結條件后用and追加一個時間間隔的限制條件。
String dataStr1="create table dataGen_demo1(" +
"type int," +
"tname string," +
"price int," +
"ts2 as localtimestamp," +
"watermark for ts2 as ts2-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.tname.length'='10'," +
"'fields.type.kind'='sequence'," +
"'fields.type.start'='1'," +
"'fields.type.end'='50'," +
"'fields.price.min'='300'," +
"'fields.price.max'='400')";
tblEnv.executeSql(dataStr1);
tblEnv.sqlQuery("select * from dataGen_demo d,dataGen_demo1 g where d.type=g.type and d.ts1 between g.ts2-interval '5' second and g.ts2+interval '5' second").execute().print();
11、FlinkSQL Client
Flink提供了SQL Client,有了它我們可以向hive的beeline一樣直接在控制臺編寫SQL并提交作業。
Flink SQL client支持運行在standalone集群和yarn集群上。提交任務的命令有所不同。
- Standalone集群(普通模式啟動)
##啟動集群、前提已經配置好flink環境變量
start-cluster.sh
##啟動客戶端
sql-client.sh embedded
- Yarn集群
前提要開啟hadoop-yarn大數據架構。
flink每次啟動yarn-session,都會創建一個/temp/.yarn-properties-root文件,記錄了最近一次提交的yarn session對應的Application ID。注意:啟動Yarn Session和SQL client必須使用相同的用戶。
##啟動YarnSession模式 前提已經配置好flink環境變量
yarn-session.sh -n 3 -jm 1024 -tm 1024
##啟動客戶端 必須與上面命令在同一個服務器節點上
sql-client.sh embedded -s yarn-session
## 客戶端控制臺測試
select 'hello word'; #測試連接是否成功
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; #測試數據
# client界面執行下面命令
# 在專門的界面展示,使用分頁table格式。可按照界面下方說明,使用快捷鍵前后翻頁和退出到SQL命令行
SET sql-client.execution.result-mode = table;
# changelog格式展示,可展示數據增(I)刪(D)改(U)
SET sql-client.execution.result-mode = changelog;
# 接近傳統數據庫的展示方式,不使用專門界面
SET sql-client.execution.result-mode = tableau;
- 安裝依賴
如果運行sql client時,需要使用第三方依賴包時,就需要將項目中用到的依賴放入flink安裝位置的lib目錄下。
例如:flink-connector-kafka_2.11-1.13.2.jar: 讀寫Kafka支持。
12、FlinkSQL 官方文檔
- Table API
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/tableapi/
Table API 是批處理和流處理的統一的關系型 API。Table API 的查詢不需要修改代碼就可以采用批 輸入或流輸入來運行。Table API 是 SQL 語言的超集,并且是針對 Apache Flink 專門設計的。 Table API 集成了 Scala,Java 和 Python 語言的 API。Table API 的查詢是使用 Java,Scala 或 Python 語言嵌入的風格定義的,有諸如自動補全和語法校驗的 IDE 支持,而不是像普通 SQL 一樣 使用字符串類型的值來指定查詢。
- SQL API
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/overview/
Flink 所支持的 SQL 語言,包括數據定義語言(Data Definition Language,DDL)、數據操縱語(Data Manipulation Language,DML)以及查詢語言。Flink 對 SQL 的支持基于實現了 SQL 標準的 Apache Calcite。
13、FlinkSQL函數
SQL中,我們可以把一些數據的轉換操作包裝起來,嵌入到SQL查詢中統一調用,這是函數。
Flink的Table API和SQL同樣提供了函數的功能。兩者在調用時略有不同:
- Table API中的函數是通過數據對象的方法調用來實現的
- SQL則是直接引用函數名稱,傳入數據作為參數。
- Table API是內嵌在java語言中,很多方法需要在類中額外添加,目前支持的函數比較少。
官方文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/overview/
- 函數類型
Flink 中的函數有兩個劃分標準:
- 一個劃分標準是:系統函數和catalog函數。
- 一個劃分標準是臨時函數和持久函數
- 因此提供了4種函數:臨時性系統函數、系統函數、臨時性catalog函數、catalog函數
flink中可以通過精確、模糊兩種引用方式引用函數:精確函數允許用戶跨catalog、數據庫,也就是指定catalog和database函數;模糊函數不用指定catalog和database使用默認catalog和database。
- 系統函數
系統函數(System Functions)也叫內置函數(Built-in Functions),是在系統中預先實現好的 功能模塊。可以通過固定的函數名直接調用,實現想要的轉換操作。又分為兩大類:標量函數和聚合函數。
函數分類:標量函數、聚合函數、時間間隔單位和時間點標識符、列函數
- 標量函數:
- 自定義函數
Flink 的 Table API 和 SQL 提供了多種自定義函數的接口,以抽象類的形式定義。
當前UDF主要有以下幾類:
- 標量函數:將輸入的標量值轉換成一個新的標量值
- 表函數:將標量值轉換成一個或多個新的行數據,也就是擴展成一個表。
- 聚合函數:將多行數據里的標量值轉換成一個新的標量值。
- 表聚合函數:將多行數據里的標量值轉換成一個或多個新的 行數據。
- UDF標量函數
自定義方式:需要自定義一個類來繼承抽象類 ScalarFunction,并實現叫作 eval() 的求值方法。
標量函數的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是 eval。
求值方法 eval 可以重載多次,任何數據類型都可作為求值方法的參數和返回值類型,寫完后將類注冊到表環境就可以直接在SQL中調用了。
- 代碼實現
import org.apache.flink.table.functions.ScalarFunction;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 21:34
*/
//自定義標量函數
public class ScalarUDFDemo extends ScalarFunction {
// 接受任意類型輸入,返回 INT 型輸出 必須使用公共權限的eval方法
public String eval(String input) {
//字符串連接字符串長度
return input.concat(String.valueOf(input.length()));
}
}
//創建模擬數據
//執行環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執行SQL
tableEnvironment.executeSql("CREATE TABLE t_datagen (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
// tableEnvironment.sqlQuery("select * from t_datagen").execute().print();
//使用Table API 直接內嵌函數執行 第一種方式
// tableEnvironment.from("t_datagen").select(call(ScalarUDFDemo.class, $("f_random_str"))).execute().print();
//第二種方式
tableEnvironment.createTemporarySystemFunction("sfsl",ScalarUDFDemo.class);
tableEnvironment.sqlQuery("select sfsl(f_random_str) from t_datagen").execute().print();
- UDF表值函數
自定義方式:
要實現自定義的表函數,需要自定義類來繼承抽象類 TableFunction,內部必須要實現的也 是一個名為 eval 的求值方法。
與標量函數不同的是,TableFunction 類本身是有一個泛型參數T 的,這就是表函數返回數據的類型。
而eval()方法沒有返回類型,內部也沒有 return語句,是通過調用 collect()方法來發送想要 輸出的行數據的。
- 數據
1,尋夢環游記,喜劇:8_動畫:7_冒險:3_音樂:9_家庭:6
2,至愛梵高,劇情:8_傳記:7_動畫:3
3,小丑回魂,劇情:6_兒童:7_恐怖:9
- 案例代碼
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 21:53
*/
/**
* Row<type STRING,score INT> 輸出字段名type、score 數據類型分別是STRING、INT
*/
@FunctionHint(output = @DataTypeHint("Row<type STRING,score INT>"))
public class UDFTableFunction extends TableFunction<Row> {
//輸入數據類型是字符串
/**
* 喜劇:8_動畫:7_冒險:3_音樂:9_家庭:6
* @param line
*/
public void eval(String line){
String[] split = line.split("_");
for (String s : split) {
String[] v = s.split(":");
collect(Row.of(v[0],Integer.parseInt(v[1])));
}
}
}
=========================================================================================
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
//使用FileSystem讀取文件
String fs="create table t_movie(" +
"id int," +
"name string," +
"types string" +
") with (" +
"'connector'='filesystem'," +
"'path'='data/movie.txt'," +
"'format'='csv')";
//sql讀取數據
tblEnv.executeSql(fs);
// tblEnv.sqlQuery("select * from t_movie").execute().print();
//Table API
tblEnv
.from("t_movie")
.joinLateral(call(UDFTableFunction.class, $("types")).as("type", "score"))
.select($("id"),$("name"),$("type"),$("score"))
.execute()
.print();
//SQL API
tblEnv.createTemporarySystemFunction("tbl_f",UDFTableFunction.class);
tblEnv.sqlQuery("select id,name,type,score from t_movie ,lateral table(tbl_f(types))").execute().print();
- UDF聚合函數
自定義方式:
- 自定義聚合函數需要繼承抽象類 AggregateFunction。
- AggregateFunction 有兩個泛型參數,T 表示聚合輸出的結果類型,ACC 則表示聚 合的中間狀態類型。
- 每個 AggregateFunction 都 必須 實現以下幾個方法:
- createAccumulator():這是創建累加器的方法。沒有輸入參數,返回類型為累加器類型 ACC
- accumulate(): 這是進行聚合計算的核心方法,每來一行數據都會調用。它的第一個參數是確定 的,就是當前的累加器,類型為 ACC,表示當前聚合的中間狀態;
- getValue():這是得到最終返回結果的方法。輸入參數是 ACC 類型的累加器,輸出類型為 T。 在遇到復雜類型時,Flink 的類型推導可能會無法得到正確的結果。
- 代碼實現
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:31
*/
/**
* AggregateFunction<Double, Tuple2<Integer,Integer>> 輸出類型是Double 中間狀態類型是Tuple2<Integer,Integer>
* 必須要實現getValue() createAccumulator() accumulate() 三個方法
*/
public class UDFAggregationDemo extends AggregateFunction<Double, Tuple2<Integer,Integer>> {
/**
* 輸出的函數邏輯代碼
* @param integerIntegerTuple2
* @return
*/
@Override
public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
if (integerIntegerTuple2.f0==0){
return 0.0;
}
return integerIntegerTuple2.f0*1.0/integerIntegerTuple2.f1;
}
/**
*
* @return 初始化中間狀態值
*/
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0,0);
}
//輸入類型是兩個int類型數據
/**
* 如果不加 @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})注解
* 傳入的字段數據類必須有not null的約束
* @param acc
* @param weight
* @param price
*/
@FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})
public void accumulate(Tuple2<Integer,Integer> acc ,Integer weight,Integer price){
acc.f0+=weight*price;
acc.f1+=weight;
}
}
========================================================================================
package com.zwf.flinkSQL;
import com.zwf.udf.UDFAggregationDemo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:43
*/
public class UDFDemo3 {
public static void main(String[] args) {
//執行環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執行SQL
tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
" id INT,\n" +
" type INT,\n" +
" weight INT,\n" +
" price INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='sequence',\n" +
" 'fields.id.start'='1',\n" +
" 'fields.id.end'='1000',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='3',\n" +
" 'fields.weight.min'='10',\n" +
" 'fields.weight.max'='20',\n" +
" 'fields.price.min'='100',\n" +
" 'fields.price.max'='200'\n" +
")");
tableEnvironment.createTemporarySystemFunction("aggre", UDFAggregationDemo.class);
tableEnvironment.sqlQuery("select type,aggre(weight,price) from t_order group by type").execute().print();
}
}
- UDF表值聚合函數
用戶自定義表聚合函數(UDTAGG)可以把一行或多行數據(也就是一個表)聚合成另一張表,結果表中可以有多行多列。
自定義方式:
- createAccumulator():創建累加器的方法,與 AggregateFunction 中用法相同
- accumulate():聚合計算的核心方法,與 AggregateFunction 中用法相同
- emitValue():所有輸入行處理完成后,輸出最終計算結果的方法。這個方法對應著 AggregateFunction中的 getValue()方法;區別在于 emitValue 沒 有輸出類型,而輸入參數有兩個:第一個是 ACC類型的累加器 第二個則是用于輸出數據的“收集器”out,它的類型為 Collect。
- 代碼
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:56
*/
/**
* TableAggregateFunction<out,acc>: out 輸出類型 acc中間值類型
*/
public class TableAggregateUDF extends TableAggregateFunction<String, Tuple3<Integer,Integer,Boolean>> {
/**
* 初始化中間值
* @return
*/
@Override
public Tuple3<Integer, Integer, Boolean> createAccumulator() {
return Tuple3.of(0,0,false);
}
/**
*
* @param acc 中間值
* @param price 輸入值
*/
public void accumulate(Tuple3<Integer,Integer,Boolean> acc,Integer price){
if(price>acc.f0){
acc.f0=price;
acc.f1=acc.f0;
acc.f2=true;
}else if (price>acc.f1){
acc.f1=price;
acc.f2=true;
}else {
acc.f2=false;
}
}
/**
*
* @param acc 中間值
* @param out 輸出集合
*/
public void emitValue(Tuple3<Integer, Integer, Boolean> acc, Collector<String> out){
if(acc.f2){
acc.f2=false;
out.collect("First[" + acc.f0 + "]Second[" + acc.f1 + "]");
}
}
}
=========================================================================================
package com.zwf.flinkSQL;
import com.zwf.udf.TableAggregateUDF;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 23:06
*/
public class UDFDemo4 {
public static void main(String[] args) {
//執行環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執行SQL
tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
" id INT,\n" +
" type INT,\n" +
" price INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='sequence',\n" +
" 'fields.id.start'='1',\n" +
" 'fields.id.end'='1000',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='3',\n" +
" 'fields.price.min'='100',\n" +
" 'fields.price.max'='200'\n" +
")");
//普通查詢
// tableEnvironment.sqlQuery("select * from t_order").execute().print();
// 注冊函數
tableEnvironment.createTemporarySystemFunction("tafop", TableAggregateUDF.class);
tableEnvironment.sqlQuery("select type,tafop(price) from t_order group by type").execute().print();
}
}
14、FlinkSQL CDC
CDC,Change Data Capture變動數據獲取的簡稱,使用CDC從數據庫獲取已提交的更改并將這些更改發送到下游,供下游使用。
- Flink CDC
在以前的數據同步中,如果想實時獲取數據庫的數據,一般采用架構就是采用第三方工具,比如canal、debezium等,實時采集數據庫的變更日志,然后將數據發送到kafka消息隊列,最后通過其他組件、比如flink、spark等消費kafka中的數據,計算之后發送到下游系統。
新架構下flink直接消費數據庫的增量日志,替代了原來的數據采集層,然后直接對數據進行計算, 最后將計算結果發送到下游.
工作原理:啟動MySQL CDC源時,它將獲取一個全局讀取鎖(FLUSH TABLES WITH READ LOCK),該 鎖將阻止其他數據庫的寫入。然后,它讀取當前binlog位置以及數據庫和表的schema之后, 將釋放 全局讀取鎖。然后,它掃描數據庫表并從先前記錄的位置讀取binlog。Flink將定期執 行checkpoints以記錄binlog位置。如果發生故障,作業將重新啟動并從checkpoint完成的 binlog位置恢復。因此,它保證了僅一次的語義。
優點:開箱即用,簡單易上手 減少維護的組件,簡化實時鏈路,減輕部署成本 減小端到端延遲
- ChangeLOg
Flink SQL 內部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數據只需要把CDC 數據轉換成 Flink 認識的數據,以便更好支持和集成 CDC。
重構后的 TableSource 輸出的都是 RowData 數據結構,代表了一行的數據。在RowData 上面會 有一個元數據的信息,我們稱為 RowKind.
RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數據庫里面的 binlog 概念十分類似。
通過 Debezium 采集的 JSON 格式,包含了舊數據和新數據行以及原數據信息,對接 Debezium JSON 的數據,其實就是將這種原始的 JSON 數據轉換成 Flink 認識的 RowData。
- mysql CDC
官方文檔:
https://github.com/ververica/flink-cdc-connectorsmysql數據庫的數據新增或者修改,將實時獲取到flink上進行計算處理并傳輸到下游!
目前支持的數據庫有以下:
- Mysql修改配置文件 (vim /etc/my.cnf)
# 服務器ID
server_id=12345
log_bin=/var/lib/mysql/mysql-bin
expire_logs_days=7
# 必須為ROW
binlog_format=ROW
binlog_cache_size=16M
max_binlog_size=100M
max_binlog_cache_size=256M
relay_log_recovery=1
# 必須為FULL,MySQL-5.7后才有該參數
binlog_row_image=FULL
expire_logs_days=30
binlog_do_db=scott
- 創建數據庫表
DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int(11) NOT NULL,
`dname` varchar(255) DEFAULT NULL,
`loc` varchar(255) DEFAULT NULL,
PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--代碼運行之后再開始插入數據
INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');
- pom.xml
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!--驅動包版本必須是8.0.27及其以上版本-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
- 代碼實現
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//創建表
tableEnvironment.executeSql("CREATE TABLE flink_cdc_dept (\n" +
" deptno INT,\n" +
" dname STRING,\n" +
" loc STRING,\n" +
" PRIMARY KEY(deptno) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.147.120',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'Root@123456.',\n" +
" 'database-name' = 'scott',\n" +
" 'table-name' = 'dept')");
//簡單查詢
tableEnvironment.sqlQuery("select * from flink_cdc_dept").execute().print();
15、Flink SQL On Hive
Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函 數和信息。
元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 元數據也可以是持久化的,例如 Hive Metastore 中的元數據。 Catalog 提供了一個統一的API,用于管理元數據,并使其可以從 Table API 和 SQL 查詢語句中來 訪問。
GenericInMemoryCatalog: 基于內存實現,所有元數據只在session聲明周期可用。
JdbcCatalog:將flink通過jdbc協議連接到關系數據庫。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 僅有的兩種實現。
HiveCatalog:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。
用戶自定義Catalog:編寫類實現對應的 CatalogFactory 接口來自定義開發Catalog。
- 連接hive集群
將flink catalog中的元數據信息持久化存儲到hive metastore對應的元數據庫中,flink打通hive集成,如同使用spark SQL或者impala操作hive中的數據一樣,直接使用flink直接讀寫hive中的表。
- pom.xml
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
- 連接hive寄去哪
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//獲取hive中元數據注冊flink中的catalog。
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
//使用hive中的catalog
tableEnv.useCatalog("myhive")
- 相關配置參數參考:
16、Flink SQL查詢優化
flink提供了兩種優化器:
- RBO(基于規則的優化器)
- CBO(基于成本的優化器)
優化方案:
- 基于 Apache Calcite 的子查詢解相關
- 投影下推(Projection Pushdown)
- 分區剪裁(Partition Prune)
- 謂詞下推(Predicate Pushdown)
- 常量折疊(Constant Folding)
- 子計劃消除重復數據以避免重復計算
- 特殊子查詢重寫:使用left semi-joins left anti-join
- 可選 join 重新排序: 通過 table.optimizer.join-reorder-enabled 啟用
優化器不僅基于計劃,而且還基于可從數據源獲得的豐富統計信息以及每個算子(例如 io,cpu, 網絡和內存)的細粒度成本來做出明智的決策。
- 常量折疊(常量替換)
常量折疊:對sql中的常量的加減乘除等操作進行預計算,避免執行過程頻繁對常量重復執行加減 乘除計算: 折疊前:1+2+t1.value;折疊后:3+t1.value.
- 謂詞下推
在from數據源中過濾出重要數據,降低了數據的掃描范圍,提升了數據庫查詢的效率!
- 投影下推(列裁剪)
投影下推:可以用來避免加載不需要的字段,只需要查詢出需要查詢的數據庫字段。由于SQL中沒用到,加載多余字段就是浪費,所以將project操作下推執行,就不需要加載無 用字段。而且此時假如是列存儲,只需要加載指定的列,優化更大。
- Hash Join
兩表進行join時,先把大表中的重要數據過濾出來變成小表,然后通過sortmergejoin, hashjoin, boradcasthashjoin,把表中數據過濾后再進行join,減少笛卡爾積值。
- Transformation Tree
- 性能調整
MiniBatch 聚合:MiniBatch 聚合的核心思想是將一組輸入的數據緩存在聚合算子內部的緩沖區中。當輸入的數據被觸發處理時,每個 key 只需一個操作即可訪問狀態。這樣可以大大減少狀態開銷并獲得更好的吞 吐量。但是,這可能會增加一些延遲,因為它會緩沖一些記錄而不是立即處理它們。這是吞吐量和 延遲之間的權衡。
- Local-Global 聚合
Local-Global 聚合是為解決數據傾斜問題提出的,通過將一組聚合分為兩個階段,首先在上游進行 本地聚合,然后在下游進行全局聚合,類似于 MapReduce 中的 Combine + Reduce 模式。
- 拆分distinct 聚合
把要去重的字段中的使用hash shuffle打散到不同分區中進行分區,然后進行去重字段聚合計算!
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
- distinct 聚合過濾
使用filter對去重的字段進行過濾,過濾后去重字段值后最后進行分組聚合!
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone'))
AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS
web_uv
FROM T
GROUP BY day
17、SQL時間日期轉換
-- flinksql里面最常用的事情就是時間格式轉換,比如各種時間格式轉換成TIMESTAMP(3).
now() bigint -- CAST(TO_TIMESTAMP(log_time) as TIMESTAMP(3)) ,log_time=now()
localtimestamp timestamp(3)
timestamp -- 不帶括號數字表示timestamp(6)
now() 1403006911000 bigint -- 毫秒時間戳數值 1528257600000
localtimestamp 1636272032500 timestamp(3) -- 毫秒時間戳
timestamp(3) 1636272032500 -- 毫秒時間戳
timestamp(9)
timestamp(6)
TIMESTAMP(9) TO_TIMESTAMP(BIGINT time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time, STRING format)
BIGINT TIMESTAMP_TO_MS(TIMTSTAMP time)
BIGINT TIMESTAMP_TO_MS(STRING time, STRING format)
TO_DATE(CAST(LOCALTIMESTAMP AS VARCHAR))
FROM_UNIXTIME(TIMESTAMP_TO_MS(localtimestamp)/1000, ‘yyyy-MM-dd HH:mm:ss’) event_time -- 6點到6點
time_pt as cast(to_timestamp(eventTime - 6 * 3600 * 1000) as TIMESTAMP(3)) -- 偏移6小時
總結
以上是生活随笔為你收集整理的FlinkSQL实战开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 作为所有类的顶层父类,没想到Object
- 下一篇: linux cmake编译源码,linu