日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink cdc 2.1.0发布测试

發(fā)布時間:2024/3/13 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink cdc 2.1.0发布测试 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

依賴:

<!-- <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.0</version><scope>provided</scope></dependency>--> <dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.1.0</version><scope>provided</scope> </dependency>

1,最簡單的代碼:

package com.ververica.cdc.connectors.mysql.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.junit.Ignore; import org.junit.Test;/** Example Tests for {@link MySqlSource}. */ public class MySqlSourceExampleTest extends MySqlSourceTestBase {@Test@Ignore("Test ignored because it won't stop and is used for manual test")public void testConsumingAllEvents() throws Exception {inventoryDatabase.createAndInitialize();MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(inventoryDatabase.getDatabaseName()).tableList(inventoryDatabase.getDatabaseName() + ".products").username(inventoryDatabase.getUsername()).password(inventoryDatabase.getPassword()).serverId("5401-5404").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true) // output the schema changes as well.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);// set the source parallelism to 4env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource").setParallelism(4).print().setParallelism(1);env.execute("Print MySQL Snapshot + Binlog");} }

includeSchemaChanges(true)?是開啟了,表結(jié)構(gòu)變更感知

我這里只分析mysql?cdc的一些升級:

  • 支持所有 MySQL 數(shù)據(jù)類型

    包括枚舉類型、數(shù)組類型、地理信息類型等復雜類型。

  • 支持 metadata column

    用戶可以在 Flink DDL 中通過 db_name STRING METADATA FROM 'database_name' 的方式來訪問庫名(database_name)、表名(table_name)、變更時間(op_ts)等 meta 信息。這對分庫分表場景的數(shù)據(jù)集成非常使用。

  • 支持并發(fā)讀取的 DataStream API

    在 2.0 版本中,無鎖算法,并發(fā)讀取等功能只在 SQL API 上透出給用戶,而 DataStream API 未透出給用戶,2.1 版本支持了 DataStream API,可通過 MySqlSourceBuilder 創(chuàng)建數(shù)據(jù)源。用戶可以同時捕獲多表數(shù)據(jù),借此搭建整庫同步鏈路。同時通過 MySqlSourceBuilder#includeSchemaChanges 還能捕獲 schema 變更。

  • 支持 currentFetchEventTimeLag,currentEmitEventTimeLag,sourceIdleTime 監(jiān)控指標

    這些指標遵循 FLIP-33 [1] 的連接器指標規(guī)范,可以查看 FLIP-33 獲取每個指標的含義。其中,currentEmitEventTimeLag 指標記錄的是 Source 發(fā)送一條記錄到下游節(jié)點的時間點和該記錄在 DB 里產(chǎn)生時間點差值,用于衡量數(shù)據(jù)從 DB 產(chǎn)生到離開 Source 節(jié)點的延遲。用戶可以通過該指標判斷 source 是否進入了 binlog 讀取階段:

    • 即當該指標為 0 時,代表還在全量歷史讀取階段;

    • 當大于 0 時,則代表進入了 binlog 讀取階段。

正常讀取數(shù)據(jù):

?debug一下結(jié)構(gòu)變更之后:

name1? -> name2

?

Struct的完整結(jié)構(gòu):

Struct{source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1637117644411,db=bi_dev,table=test,server_id=1921684100,gtid=ef6f9e15-1218-11ec-997f-968db1336f14:2840388,file=mysql-bin.000056,pos=509499737,row=0},historyRecord={"source":{"file":"mysql-bin.000056","pos":509499737,"server_id":1921684100},"position":{"transaction_id":null,"ts_sec":1637117644,"file":"mysql-bin.000056","pos":509499965,"gtids":"ef6f9e15-1218-11ec-997f-968db1336f14:1-2840387","server_id":1921684100},"databaseName":"bi_dev","ddl":"ALTER TABLE `bi_dev`.`test` \r\nCHANGE COLUMN `name1` `name2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL AFTER `id`","tableChanges":[{"type":"ALTER","id":"\"bi_dev\".\"test\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":1,"optional":false,"autoIncremented":true,"generated":true},{"name":"name2","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"utf8mb4","length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false},{"name":"date4","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":3,"optional":true,"autoIncremented":false,"generated":false},{"name":"datetime1","jdbcType":93,"typeName":"DATETIME","typeExpression":"DATETIME","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false},{"name":"timestamp1","jdbcType":2014,"typeName":"TIMESTAMP","typeExpression":"TIMESTAMP","charsetName":null,"position":5,"optional":true,"autoIncremented":true,"generated":true}]}}]}}

修改語句為:

"databaseName":"bi_dev","ddl":"ALTER TABLE `bi_dev`.`test`? CHANGE COLUMN `name1` `name2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL AFTER `id`"

我們執(zhí)行這個語句,稍微修改一下 ,將目前的name2?修改成name3:

?查看效果:

修改之后的詳細情況:

?[
????{
????????"type":"ALTER",
????????"id":"\"bi_dev\".\"test\"",
????????"table":{
????????????"defaultCharsetName":"utf8mb4",
????????????"primaryKeyColumnNames":[
????????????????"id"
????????????],
????????????"columns":[
????????????????{
????????????????????"name":"id",
????????????????????"jdbcType":4,
????????????????????"typeName":"INT",
????????????????????"typeExpression":"INT",
????????????????????"charsetName":null,
????????????????????"length":11,
????????????????????"position":1,
????????????????????"optional":false,
????????????????????"autoIncremented":true,
????????????????????"generated":true
????????????????},
????????????????{
????????????????????"name":"name2",
????????????????????"jdbcType":12,
????????????????????"typeName":"VARCHAR",
????????????????????"typeExpression":"VARCHAR",
????????????????????"charsetName":"utf8mb4",
????????????????????"length":255,
????????????????????"position":2,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"date4",
????????????????????"jdbcType":91,
????????????????????"typeName":"DATE",
????????????????????"typeExpression":"DATE",
????????????????????"charsetName":null,
????????????????????"position":3,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"datetime1",
????????????????????"jdbcType":93,
????????????????????"typeName":"DATETIME",
????????????????????"typeExpression":"DATETIME",
????????????????????"charsetName":null,
????????????????????"position":4,
????????????????????"optional":true,
????????????????????"autoIncremented":false,
????????????????????"generated":false
????????????????},
????????????????{
????????????????????"name":"timestamp1",
????????????????????"jdbcType":2014,
????????????????????"typeName":"TIMESTAMP",
????????????????????"typeExpression":"TIMESTAMP",
????????????????????"charsetName":null,
????????????????????"position":5,
????????????????????"optional":true,
????????????????????"autoIncremented":true,
????????????????????"generated":true
????????????????}
????????????]
????????}
????}
]

所以一定添加了對元數(shù)據(jù)修改的操作,數(shù)據(jù)解析也不一樣了,要添加判斷,后續(xù)代碼會添加完整的解析代碼,?然后監(jiān)控元數(shù)據(jù)操作之后,針對下游的doris表進行元數(shù)據(jù)修改,未完。?

總結(jié)

以上是生活随笔為你收集整理的Flink cdc 2.1.0发布测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。