Flink cdc 2.1.0发布测试
依賴:
<!-- <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.0</version><scope>provided</scope></dependency>--> <dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.1.0</version><scope>provided</scope> </dependency>1,最簡單的代碼:
package com.ververica.cdc.connectors.mysql.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.junit.Ignore; import org.junit.Test;/** Example Tests for {@link MySqlSource}. */ public class MySqlSourceExampleTest extends MySqlSourceTestBase {@Test@Ignore("Test ignored because it won't stop and is used for manual test")public void testConsumingAllEvents() throws Exception {inventoryDatabase.createAndInitialize();MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(inventoryDatabase.getDatabaseName()).tableList(inventoryDatabase.getDatabaseName() + ".products").username(inventoryDatabase.getUsername()).password(inventoryDatabase.getPassword()).serverId("5401-5404").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true) // output the schema changes as well.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);// set the source parallelism to 4env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print MySQL Snapshot + Binlog");} }includeSchemaChanges(true)?是開啟了,表結(jié)構(gòu)變更感知
我這里只分析mysql?cdc的一些升級:
-
支持所有 MySQL 數(shù)據(jù)類型
包括枚舉類型、數(shù)組類型、地理信息類型等復雜類型。
-
支持 metadata column
用戶可以在 Flink DDL 中通過 db_name STRING METADATA FROM 'database_name' 的方式來訪問庫名(database_name)、表名(table_name)、變更時間(op_ts)等 meta 信息。這對分庫分表場景的數(shù)據(jù)集成非常使用。
-
支持并發(fā)讀取的 DataStream API
在 2.0 版本中,無鎖算法,并發(fā)讀取等功能只在 SQL API 上透出給用戶,而 DataStream API 未透出給用戶,2.1 版本支持了 DataStream API,可通過 MySqlSourceBuilder 創(chuàng)建數(shù)據(jù)源。用戶可以同時捕獲多表數(shù)據(jù),借此搭建整庫同步鏈路。同時通過 MySqlSourceBuilder#includeSchemaChanges 還能捕獲 schema 變更。
-
支持 currentFetchEventTimeLag,currentEmitEventTimeLag,sourceIdleTime 監(jiān)控指標
這些指標遵循 FLIP-33 [1] 的連接器指標規(guī)范,可以查看 FLIP-33 獲取每個指標的含義。其中,currentEmitEventTimeLag 指標記錄的是 Source 發(fā)送一條記錄到下游節(jié)點的時間點和該記錄在 DB 里產(chǎn)生時間點差值,用于衡量數(shù)據(jù)從 DB 產(chǎn)生到離開 Source 節(jié)點的延遲。用戶可以通過該指標判斷 source 是否進入了 binlog 讀取階段:
-
即當該指標為 0 時,代表還在全量歷史讀取階段;
-
當大于 0 時,則代表進入了 binlog 讀取階段。
-
正常讀取數(shù)據(jù):
?debug一下結(jié)構(gòu)變更之后:
name1? -> name2
?
Struct的完整結(jié)構(gòu):
Struct{source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1637117644411,db=bi_dev,table=test,server_id=1921684100,gtid=ef6f9e15-1218-11ec-997f-968db1336f14:2840388,file=mysql-bin.000056,pos=509499737,row=0},historyRecord={"source":{"file":"mysql-bin.000056","pos":509499737,"server_id":1921684100},"position":{"transaction_id":null,"ts_sec":1637117644,"file":"mysql-bin.000056","pos":509499965,"gtids":"ef6f9e15-1218-11ec-997f-968db1336f14:1-2840387","server_id":1921684100},"databaseName":"bi_dev","ddl":"ALTER TABLE `bi_dev`.`test` \r\nCHANGE COLUMN `name1` `name2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL AFTER `id`","tableChanges":[{"type":"ALTER","id":"\"bi_dev\".\"test\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":1,"optional":false,"autoIncremented":true,"generated":true},{"name":"name2","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"utf8mb4","length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false},{"name":"date4","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":3,"optional":true,"autoIncremented":false,"generated":false},{"name":"datetime1","jdbcType":93,"typeName":"DATETIME","typeExpression":"DATETIME","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false},{"name":"timestamp1","jdbcType":2014,"typeName":"TIMESTAMP","typeExpression":"TIMESTAMP","charsetName":null,"position":5,"optional":true,"autoIncremented":true,"generated":true}]}}]}}
修改語句為:
"databaseName":"bi_dev","ddl":"ALTER TABLE `bi_dev`.`test`? CHANGE COLUMN `name1` `name2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL AFTER `id`"
我們執(zhí)行這個語句,稍微修改一下 ,將目前的name2?修改成name3:
?查看效果:
修改之后的詳細情況:
?[
????{
????????"type":"ALTER",
????????"id":"\"bi_dev\".\"test\"",
????????"table":{
????????????"defaultCharsetName":"utf8mb4",
????????????"primaryKeyColumnNames":[
????????????????"id"
????????????],
????????????"columns":[
????????????????{
????????????????????"name":"id",
????????????????????"jdbcType":4,
????????????????????"typeName":"INT",
????????????????????"typeExpression":"INT",
????????????????????"charsetName":null,
????????????????????"length":11,
????????????????????"position":1,
????????????????????"optional":false,
????????????????????"autoIncremented":true,
????????????????????"generated":true
????????????????},
????????????????{
????????????????????"name":"name2",
????????????????????"jdbcType":12,
????????????????????"typeName":"VARCHAR",
????????????????????"typeExpression":"VARCHAR",
????????????????????"charsetName":"utf8mb4",
????????????????????"length":255,
????????????????????"position":2,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"date4",
????????????????????"jdbcType":91,
????????????????????"typeName":"DATE",
????????????????????"typeExpression":"DATE",
????????????????????"charsetName":null,
????????????????????"position":3,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"datetime1",
????????????????????"jdbcType":93,
????????????????????"typeName":"DATETIME",
????????????????????"typeExpression":"DATETIME",
????????????????????"charsetName":null,
????????????????????"position":4,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"timestamp1",
????????????????????"jdbcType":2014,
????????????????????"typeName":"TIMESTAMP",
????????????????????"typeExpression":"TIMESTAMP",
????????????????????"charsetName":null,
????????????????????"position":5,
????????????????????"optional":true,
????????????????????"autoIncremented":true,
????????????????????"generated":true
????????????????}
????????????]
????????}
????}
]
所以一定添加了對元數(shù)據(jù)修改的操作,數(shù)據(jù)解析也不一樣了,要添加判斷,后續(xù)代碼會添加完整的解析代碼,?然后監(jiān)控元數(shù)據(jù)操作之后,針對下游的doris表進行元數(shù)據(jù)修改,未完。?
總結(jié)
以上是生活随笔為你收集整理的Flink cdc 2.1.0发布测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 精英意志(五)
- 下一篇: 如何统计一列中姓名重复出现的次数,并且只