日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

發布時間:2025/3/19 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

之前有文章 《從0到1學習Flink》—— Flink 寫入數據到 Kafka 寫過 Flink 將處理后的數據后發到 Kafka 消息隊列中去,當然我們常用的消息隊列可不止這一種,還有 RocketMQ、RabbitMQ 等,剛好 Flink 也支持將數據寫入到 RabbitMQ,所以今天我們就來寫篇文章講講如何將 Flink 處理后的數據寫入到 RabbitMQ。

前提準備

安裝 RabbitMQ

這里我直接用 docker 命令安裝吧,先把 docker 在 mac 上啟動起來。

在命令行中執行下面的命令:

docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

對這個命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/

登錄用戶名和密碼分別是:admin / admin ,登錄進去是這個樣子就代表安裝成功了:

依賴

pom.xml 中添加 Flink connector rabbitmq 的依賴如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId><version>${flink.version}</version> </dependency>

生產者

這里我們依舊自己寫一個工具類一直的往 RabbitMQ 中的某個 queue 中發數據,然后由 Flink 去消費這些數據。

注意按照我的步驟來一步步操作,否則可能會出現一些錯誤!

RabbitMQProducerUtil.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducerUtil {public final static String QUEUE_NAME = "zhisheng";public static void main(String[] args) throws Exception {//創建連接工廠ConnectionFactory factory = new ConnectionFactory();//設置RabbitMQ相關信息factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin");factory.setPort(5672);//創建一個新的連接Connection connection = factory.newConnection();//創建一個通道Channel channel = connection.createChannel();// 聲明一個隊列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);//發送消息到隊列中String message = "Hello zhisheng";//我們這里演示發送一千條數據for (int i = 0; i < 1000; i++) {channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));System.out.println("Producer Send +'" + message + i);}//關閉通道和連接channel.close();connection.close();} }

Flink 主程序

import com.zhisheng.common.utils.ExecutionEnvUtil; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** 從 rabbitmq 讀取數據*/ public class Main {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;//這些配置建議可以放在配置文件中,然后通過 parameterTool 來獲取對應的參數值final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,"zhisheng",true,new SimpleStringSchema())).setParallelism(1);zhisheng.print();//如果想保證 exactly-once 或 at-least-once 需要把 checkpoint 開啟 // env.enableCheckpointing(10000);env.execute("flink learning connectors rabbitmq");} }

運行 RabbitMQProducerUtil 類,再運行 Main 類!

注意??:

1、RMQConnectionConfig 中設置的用戶名和密碼要設置成 admin/admin,如果你換成是 guest/guest,其實是在 RabbitMQ 里面是沒有這個用戶名和密碼的,所以就會報這個錯誤:

nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

不出意外的話應該你運行 RabbitMQProducerUtil 類后,立馬兩個運行的結果都會出來,速度還是很快的。

2、如果你在 RabbitMQProducerUtil 工具類中把注釋的那行代碼打開的話:

// 聲明一個隊列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);

就會出現這種錯誤:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

這是因為你打開那個注釋的話,一旦你運行了該類就會創建一個叫做 zhisheng 的 Queue,當你再運行 Main 類中的時候,它又會創建這樣一個叫 zhisheng 的 Queue,然后因為已經有同名的 Queue 了,所以就有了沖突,解決方法就是把那行代碼注釋就好了。

3、該 connector(連接器)中提供了 RMQSource 類去消費 RabbitMQ queue 中的消息和確認 checkpoints 上的消息,它提供了三種不一樣的保證:

  • Exactly-once(只消費一次): 前提條件有,1 是要開啟 checkpoint,因為只有在 checkpoint 完成后,才會返回確認消息給 RabbitMQ(這時,消息才會在 RabbitMQ 隊列中刪除);2 是要使用 Correlation ID,在將消息發往 RabbitMQ 時,必須在消息屬性中設置 Correlation ID。數據源根據 Correlation ID 把從 checkpoint 恢復的數據進行去重;3 是數據源不能并行,這種限制主要是由于 RabbitMQ 將消息從單個隊列分派給多個消費者。
  • At-least-once(至少消費一次): 開啟了 checkpoint,但未使用相 Correlation ID 或 數據源是并行的時候,那么就只能保證數據至少消費一次了
  • No guarantees(無法保證): Flink 接收到數據就返回確認消息給 RabbitMQ

Sink 數據到 RabbitMQ

RabbitMQ 除了可以作為數據源,也可以當作下游,Flink 消費數據做了一些處理之后也能把數據發往 RabbitMQ,下面演示下 Flink 消費 Kafka 數據后寫入到 RabbitMQ。

public class Main1 {public static void main(String[] args) throws Exception {final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();//注意,換一個新的 queue,否則也會報錯data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));env.execute("flink learning connectors rabbitmq");} }

是不是很簡單?但是需要注意的是,要換一個之前不存在的 queue,否則是會報錯的。

不出意外的話,你可以看到 RabbitMQ 的監控頁面會出現新的一個 queue 出來,如下圖:

總結

本文先把 RabbitMQ 作為數據源,寫了個 Flink 消費 RabbitMQ 隊列里面的數據進行打印出來,然后又寫了個 Flink 消費 Kafka 數據后寫入到 RabbitMQ 的例子!

本文原創地址是: http://www.54tianzhisheng.cn/2019/01/20/Flink-RabbitMQ-sink/ , 未經允許禁止轉載。

關注我

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復關鍵字:Flink 即可無條件獲取到。

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學習 flink 的一些 demo 和博客。

本文的項目代碼在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-rabbitmq

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門

3、《從0到1學習Flink》—— Flink 配置文件詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)

9、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

11、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

12、《從0到1學習Flink》—— Flink 項目如何運行?

13、《從0到1學習Flink》—— Flink 寫入數據到 Kafka

14、《從0到1學習Flink》—— Flink JobManager 高可用性配置

15、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹

16、《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL

17、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 RabbitMQ

18、《從0到1學習Flink》—— 你上傳的 jar 包藏到哪里去了?

總結

以上是生活随笔為你收集整理的kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。