Flink 分别读取kafka和mysql作为source
生活随笔
收集整理的這篇文章主要介紹了
Flink 分别读取kafka和mysql作为source
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
需求
首先從kafka中讀取數據,然后從mysql中讀取數據,然后將這兩個數據進行合并處理。
環境
- Flink 1.8.2
實現
public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();String topic = "mytest";Properties properties = new Properties();properties.setProperty("bootstrap.servers", "swarm-manager:9092");properties.setProperty("group.id", "test");properties.setProperty("key.serializer", StringSerializer.class.getName());properties.setProperty("value.serializer", StringSerializer.class.getName());FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);//接收kafkaDataStreamSource<String> kafkaDataSource = executionEnvironment.addSource(flinkKafkaConsumer);SingleOutputStreamOperator<String> kafkaData = data.map(new MapFunction<String, String>() {....});//接收mysqlDataStreamSource<HashMap<String, String>> mysqlData = executionEnvironment.addSource(new MySqlSource()); }上面可以實現分別從mysql和kafka中獲取數據,并且都設置好數據源。
其中mysql數據源配置如下:
連接兩個數據源:
// CoFlatMapFunction 的第一個類型是logData的數據類型,第二個類型是Mysql的數據類型,第三個類型是輸出類型SingleOutputStreamOperator<String> connectData = logData.connect(mysqlData).flatMap(new CoFlatMapFunction<Tuple3<Long, String, String>, HashMap<String, String>, String>() {HashMap<String, String> userDomainMap = new HashMap<>();//處理logData的@Overridepublic void flatMap1(Tuple3<Long, String, String> longStringStringTuple3, Collector<String> collector) throws Exception {String domain = longStringStringTuple3.f1;String userId = userDomainMap.getOrDefault(domain, "");System.out.println("userID:" + userId);collector.collect(longStringStringTuple3.f0 + "\t" + longStringStringTuple3.f1 + "\t" + longStringStringTuple3.f2 + "\t" + userId);}//處理mysql的@Overridepublic void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception {userDomainMap = stringStringHashMap;}});connectData.setParallelism(1).print();使用connector鏈接兩個數據源
總結
以上是生活随笔為你收集整理的Flink 分别读取kafka和mysql作为source的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink三种运行模式安装部署
- 下一篇: Spark SQL:SQLContext