日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot集成flink-cdc

發布時間:2024/3/13 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot集成flink-cdc 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 前文
    • (1)什么是CDC
    • (2)Flink-CDC是什么
    • (3)Flink-CDC 特性
  • CDC與Flink畢業版本
  • Springboot項目整合Flink-CDC
    • (1)說明
    • (2)引入依賴
    • (3)接入springboot項目
      • 創建監聽類 實現 ApplicationRunner
      • 自定義數據讀取解析器
      • 變更對象
      • 自定義sink 交由spring管理

前文

(1)什么是CDC

CDC:全稱是 Change Data Capture,即數據變更捕獲技術,具體的含義是 通過識別和捕獲對數據庫中的數據所做的更改(包括數據或數據表的插入、更新、刪除;數據庫結構的變更調整等),然后將這些更改按發生的順序完整記錄下來,并實時通過中間技術橋梁(消息中間件、TCP等等)將變更順序消息傳送到下游流程或系統的過程。

(2)Flink-CDC是什么

CDC Connectors for Apache Flink ?是一組用于Apache Flink ?的源連接器,使用變更數據捕獲 (CDC) 從不同數據庫獲取變更。用于 Apache Flink ?的 CDC 連接器將 Debezium 集成為捕獲數據更改的引擎。所以它可以充分發揮 Debezium 的能力。

白話的意思是,Flink-CDC 一個成型的cdc技術實現(Debezium)的包裝,我前面也使用過Debezium,并編寫了一個簡略的博客,感興趣的可以戳下方連接去看一下

springboot+debezium捕獲數據庫變更(mysql、sql-server、mongodb、oracle…)

(3)Flink-CDC 特性

  • 支持讀取數據庫快照,即使發生故障也能繼續讀取binlog,一次處理。

  • DataStream API 的 CDC 連接器,用戶可以在單個作業中使用多個數據庫和表的更改,而無需部署 Debezium 和 Kafka。

  • Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創建 CDC 源來監控單個表的更改。

  • CDC與Flink畢業版本

    下表顯示了 Flink? CDC 連接器和 Flink? 之間的版本映射:

    Flink ? CDC 版本Flink?版本_
    1.0.01.11.*
    1.1.01.11.*
    1.2.01.12.*
    1.3.01.12.*
    1.4.01.13.*
    2.0.*1.13.*
    2.1.*1.13.*
    2.2.*1.13.* , 1.14.*

    Springboot項目整合Flink-CDC

    (1)說明

    按常理來說,一個正常的flink-job 最終我們并不會集成到springboot項目中,我們會直接編寫一個maven項目,在發布時使用flink程序來啟動任務

    比如官網示例:

    本文即要使用flink-cdc進行數據變更捕獲 (可以視作為一個flink-job),但又要契合我們的springboot項目,使用spring的特性,因此,我們需要轉換一下思路,轉換成什么樣子呢?就是不要將這個flink-cdc作為一個job 使用flink程序進行發布提交,我們就當它在我們開發時一樣,作為一個本地項目,main方法啟動

    (2)引入依賴

    flink客戶端版本使用 1.13.6 cdc 版本使用 2.0.0

    <properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency></dependencies>

    (3)接入springboot項目

    無法簡單的使用main方法來啟動cdc 作業,因為如果這樣的話,我們就無法與spring完美的契合

    因此我們可以利用springboot的特性, 實現 ApplicationRunner 將flink-cdc 作為一個項目啟動時需要運行的分支子任務即可

    創建監聽類 實現 ApplicationRunner

    package com.leilei.mysql;import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 13:42* @desc mysql變更監聽**/ @Component public class MysqlEventListener implements ApplicationRunner {private final DataChangeSink dataChangeSink;public MysqlEventListener(DataChangeSink dataChangeSink) {this.dataChangeSink = dataChangeSink;}@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);streamSource.addSink(dataChangeSink);env.execute("mysql-stream-cdc");}/*** 構造變更數據源** @param* @return DebeziumSourceFunction<DataChangeInfo>* @author lei* @date 2022-08-25 15:29:38*/private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {return MySqlSource.<DataChangeInfo>builder().hostname("10.50.40.145").port(3306).databaseList("paas_common_db").tableList("paas_common_db.base_business_driver_score_*").username("root").password("cdwk-3g-145")/**initial初始化快照,即全量導入后增量導入(檢測更新數據寫入)* latest:只進行增量導入(不讀取歷史變化)* timestamp:指定時間戳進行數據導入(大于等于指定時間錯讀取數據)*/.startupOptions(StartupOptions.latest()).deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();} }

    自定義數據讀取解析器

    我這里解析為一個數據變更對象

    package com.leilei.mysql;import com.alibaba.fastjson.JSON; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import com.alibaba.fastjson.JSONObject;import java.util.List; import java.util.Optional;/*** @author lei* @create 2022-08-25 13:43* @desc mysql消息讀取自定義序列化**/ public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化數據,轉為變更JSON對象* @param sourceRecord* @param collector* @return void* @author lei* @date 2022-08-25 14:44:31*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());//5.獲取操作類型 CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.輸出數據collector.collect(dataChangeInfo);}/**** 從袁術數據獲取出變更之前或之后的數據* @param value* @param fieldElement* @return JSONObject* @author lei* @date 2022-08-25 14:48:13*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);} }

    變更對象

    import lombok.Data;/*** @author lei* @create 2022-08-25 14:33* @desc 數據變更對象**/ @Data public class DataChangeInfo {/*** 變更前數據*/private String beforeData;/*** 變更后數據*/private String afterData;/*** 變更類型 1新增 2修改 3刪除*/private Integer eventType;/*** binlog文件名*/private String fileName;/*** binlog當前讀取點位*/private Integer filePos;/*** 數據庫名*/private String database;/*** 表名*/private String tableName;/*** 變更時間*/private Long changeTime;}

    自定義sink 交由spring管理

    package com.leilei.mysql;import lombok.extern.log4j.Log4j2; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 14:01* @desc**/ @Component @Log4j2 public class DataChangeSink implements SinkFunction<DataChangeInfo> {@Overridepublic void invoke(DataChangeInfo value, Context context) {log.info("收到變更原始數據:{}", value);// todo 數據處理;因為此sink也是交由了spring管理,您想進行任何操作都非常簡單} }

    當然,以上僅僅只是整合思路,如果你想使用flink-cdc 進行數據同步或日志記錄等,結合您自身的需求進行調整接口,以上內容,大的架子是沒問題的

    如果遇到問題,可以先從官網QA尋找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

    項目源碼:springboot-flink-cdc

    總結

    以上是生活随笔為你收集整理的springboot集成flink-cdc的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。