日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink 分别读取kafka和mysql作为source

發布時間:2024/9/16 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 分别读取kafka和mysql作为source 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需求

首先從kafka中讀取數據,然后從mysql中讀取數據,然后將這兩個數據進行合并處理。

環境

  • Flink 1.8.2

實現

public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();String topic = "mytest";Properties properties = new Properties();properties.setProperty("bootstrap.servers", "swarm-manager:9092");properties.setProperty("group.id", "test");properties.setProperty("key.serializer", StringSerializer.class.getName());properties.setProperty("value.serializer", StringSerializer.class.getName());FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);//接收kafkaDataStreamSource<String> kafkaDataSource = executionEnvironment.addSource(flinkKafkaConsumer);SingleOutputStreamOperator<String> kafkaData = data.map(new MapFunction<String, String>() {....});//接收mysqlDataStreamSource<HashMap<String, String>> mysqlData = executionEnvironment.addSource(new MySqlSource()); }

上面可以實現分別從mysql和kafka中獲取數據,并且都設置好數據源。
其中mysql數據源配置如下:

package com.vincent.project;import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap;public class MySqlSource extends RichParallelSourceFunction<HashMap<String, String>> {private PreparedStatement ps;private Connection connection;// 用來建立連接@Overridepublic void open(Configuration parameters) throws Exception {connection = getConnection();String sql = "select user_id, domain from user_domain_config";ps = this.connection.prepareStatement(sql);System.out.println("open");}@Overridepublic void close() throws Exception {if (ps != null) {ps.close();}if (connection != null) {connection.close();}}private Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://swarm-manager:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8";String username = "root";String password = "123456";con = DriverManager.getConnection(url, username, password);} catch (Exception e) {e.printStackTrace();System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}/*** 此處是代碼的關鍵,要從mysql表中,把數據讀取出來,轉成Map進行數據的封裝* @param sourceContext* @throws Exception*/@Overridepublic void run(SourceContext<HashMap<String, String>> sourceContext) throws Exception {ResultSet resultSet = ps.executeQuery();HashMap<String, String> map = new HashMap<>();while (resultSet.next()) {String user_id = resultSet.getString("user_id");String domain = resultSet.getString("domain");map.put(domain, user_id);}sourceContext.collect(map);}@Overridepublic void cancel() {} }

連接兩個數據源:

// CoFlatMapFunction 的第一個類型是logData的數據類型,第二個類型是Mysql的數據類型,第三個類型是輸出類型SingleOutputStreamOperator<String> connectData = logData.connect(mysqlData).flatMap(new CoFlatMapFunction<Tuple3<Long, String, String>, HashMap<String, String>, String>() {HashMap<String, String> userDomainMap = new HashMap<>();//處理logData的@Overridepublic void flatMap1(Tuple3<Long, String, String> longStringStringTuple3, Collector<String> collector) throws Exception {String domain = longStringStringTuple3.f1;String userId = userDomainMap.getOrDefault(domain, "");System.out.println("userID:" + userId);collector.collect(longStringStringTuple3.f0 + "\t" + longStringStringTuple3.f1 + "\t" + longStringStringTuple3.f2 + "\t" + userId);}//處理mysql的@Overridepublic void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception {userDomainMap = stringStringHashMap;}});connectData.setParallelism(1).print();

使用connector鏈接兩個數據源

總結

以上是生活随笔為你收集整理的Flink 分别读取kafka和mysql作为source的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。