日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink示例——Flink-CDC

發布時間:2024/9/27 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink示例——Flink-CDC 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink示例——Flink-CDC

版本信息

產品版本
Flink1.11.1
flink-cdc-connectors1.1.0
Java1.8.0_231
MySQL5.7.16

注意:官方說目前支持MySQL-5.7和8,但筆者還簡單測試過mariadb-10.0.38(對應MySQL-5.6)。包括增加、刪除、更新、聚合,目前皆可用,但不排除未知問題。

Mavan依賴

pom.xml 依賴部分

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.1</flink.version> </properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version><type>test-jar</type></dependency><!-- Flink-CDC --><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.1.0</version></dependency></dependencies>

主從同步配置、數據準備

  • 關閉MySQL服務
  • 在需要被同步的MySQL節點,添加如下配置(可供參考的文檔)
[mysqld] # 前面還有其他配置 # 添加的部分 server-id = 12345 log-bin = mysql-bin # 必須為ROW binlog_format = ROW # 必須為FULL,MySQL-5.7后才有該參數 binlog_row_image = FULL expire_logs_days = 10
  • 啟動MySQL服務
  • 使用如下命令,可查看binlog相關變量配置
SHOW VARIABLES LIKE '%binlog%';
  • 創建待測試的庫、表、數據
CREATE DATABASE db_inventory_cdc;CREATE TABLE tb_products_cdc(id INT PRIMARY KEY AUTO_INCREMENT,name VARCHAR(64),description VARCHAR(128) );INSERT INTO tb_products_cdc VALUES (DEFAULT, 'zhangsan', 'aaa'),(DEFAULT, 'lisi', 'bbb'),(DEFAULT, 'wangwu', 'ccc');
  • 創建用于同步的用戶,并給予權限(可供參考的文檔)
-- 設置擁有同步權限的用戶 CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword'; -- 賦予同步相關權限 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';

使用Flink-CDC

  • sql-client 方面,官方已經給出了示例,點擊查看
  • 編碼方式,方便提交jar包,示例如下
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory;/*** Description: Flink-CDC 測試* <br/>* Date: 2020/9/16 14:03** @author ALion*/ public class FlinkCDCSQLTest {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);// 數據源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkuser',\n" +" 'password' = 'flinkpassword',\n" +" 'database-name' = 'db_inventory_cdc',\n" +" 'table-name' = 'tb_products_cdc'\n" +")";// 輸出目標表String sinkDDL ="CREATE TABLE tb_sink (\n" +" name STRING,\n" +" countSum BIGINT,\n" +" PRIMARY KEY (name) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";// 簡單的聚合處理String transformSQL ="INSERT INTO tb_sink " +"SELECT name, COUNT(1) " +"FROM mysql_binlog " +"GROUP BY name";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);// 等待flink-cdc完成快照waitForSnapshotStarted("tb_sink");result.print();result.getJobClient().get().cancel().get();}private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {while (sinkSize(sinkName) == 0) {Thread.sleep(100);}}private static int sinkSize(String sinkName) {synchronized (TestValuesTableFactory.class) {try {return TestValuesTableFactory.getRawResults(sinkName).size();} catch (IllegalArgumentException e) {// job is not started yetreturn 0;}}}}
  • 運行任務

簡單的測試

  • 進行簡單測試,開始修改MySQL表的數據
-- SQL測試數據,對照Flink應用INSERT INTO tb_products_cdc VALUE(DEFAULT, 'lisi', 'ddd');DELETE FROM tb_products_cdc WHERE id=4;UPDATE tb_products_cdc SET name='wangwu' WHERE id=2;
  • 執行一條SQL,查看一下Flink的結果變化

總結

以上是生活随笔為你收集整理的Flink示例——Flink-CDC的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。