RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;
說明:
(1)內容說明:
? ? ? ? ? ●?這兒我們會創建一個項目,演示RabbitMQ最基礎的內容;
通過,這個最簡單的例子,先了解:如何使用RabbitMQ,如何連接RabbitMQ,如何發送消息,如何接收消息等最最基礎的內容;
? ? ? ? ? ● 然后,會演示多個消費者平均壓力的內容;
目錄
一:第一個生產者和消費者;
0.創建一個maven項目rabbitmq,演示用;
1.引入RabbitMQ的Java客戶端的,依賴;
2.第一個生產者;
3.第一個消費者;
4.瞅一眼RabbitMQ管理后臺;
二:?根據消息內容的不同,采取不同的處理策略;(這兒演示的是一種思路)
三:當消息相對較多時,多個消費者平均壓力;?
1.多個消費者,平均壓力:引入;?
2.公平派遣;
一:第一個生產者和消費者;
0.創建一個maven項目rabbitmq,演示用;
?RabbitMQ支持多語言,其中就包括Java;同時,RabbitMQ的API豐富,我們可以利用RabbitMQ針對Java提供的客戶端的一系列API,來完成操作;
1.引入RabbitMQ的Java客戶端的,依賴;
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.29</version></dependency></dependencies>說明:
(1)依賴說明;
2.第一個生產者;
Send類:
package helloworld;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 描述:發送消息的類:連接到RabbitMQ的服務端,然后發送一條消息,然后退出;*/ public class Send {//我們發送消息時,需要指定要發到哪里去;所以,我們要指定隊列的名字;所以,這兒我們定義隊列的名字;//這個名字可隨便取,待會在接收的消息時候,要使用這個隊列;private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2.設置RabbitMQ的地址(即RabbitMQ的服務端的地址)//這里面填寫是RabbitMQ服務端所在服務器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想連接RabbitMQ的服務端,我么還需要通過一個用戶才行;// 所以,這兒我們使用【前面我們設置的,能夠在其他服務器上訪問RabbitMQ所在服務器的,admin用戶】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:記得要放開RabbitMQ部署的服務器的,5672端口;//3.建立連接Connection connection = connectionFactory.newConnection();//4.獲得Channel信道(我們大部分的操作,都是在信道上完成的;有了信道后,我們就可以進行操作了)Channel channel = connection.createChannel();//5.聲明隊列(有了隊列之后,我們就可以發布消息了)//參數說明:第一個參數(queue):隊列名;// 第二個參數(durable):這個隊列是否需要持久(即,服務重啟后,這個隊列是否需要還存在;這兒我們根據自己的需求,設為了false;)//第三個參數(exclusive):這個隊列是否獨有(即,這個隊列是不是僅能給這個連接使用;這兒我們設為了false)//第四個參數(autoDelete):這個隊列是否需要自動刪除(即,在隊列沒有使用的時候,是否需要自動刪除;這兒我們設為了false)//第五個參數(arguments);channel.queueDeclare(QUEUE_NAME, false, false, false, null);//6.發布消息String message = "測試的消息";//參數說明:第一個參數(exchange)是交換機,這兒我們暫時不深入了解;// 第二個參數(routingKey)是路由鍵,這兒我們就寫成隊列的名字;//第三個參數(props),消息除了消息體外,還要有props作為它的配置;// 第四個參數(body)消息的內容,要求是byte[]類型的,同時,需要指定編碼類型channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息發送成功了:" + message);//7.關閉連接:先關閉channel信道,然后關閉connection連接;channel.close();connection.close();}}說明:
(0)RabbitMQ有三個所謂的“端”:這兒梳理一下;
? ? ? ? ? ● 服務端:就是我們安裝了RabbitMQ的Linux系統;我們把RabbitMQ啟動后,這個服務器中的RabbitMQ就是服務端;
? ? ? ? ? ● 管理后臺:RabbitMQ的管理后臺,就是我們在【RabbitMQ入門3:RabbitMQ管理后臺,簡介;】中,演示的在web端查看、管理RabbitMQ的一些配置的地方;(PS:要想在網頁上訪問管理后臺,那么部署RabbitMQ的服務器,就要開發RabbitMQ的15672端口)
? ? ? ? ? ● 客戶端:比如,在這兒,我們在我們的Java項目中,引入RabbitMQ提供的Java客戶端后,我們就可以通過客戶端去操作RabbitMQ了;(PS:要想在遠端服務器,通過客戶端訪問RabbitMQ服務,那么部署RabbitMQ的服務器,就要開發RabbitMQ的5672端口)
(1)看注釋;這兒的連接RabbitMQ的套路,都是相對固定的,后面如有需要,我們似乎也可以創建一個工具類;
(2)我們要想在其他服務器,通過客戶端訪問RabbitMQ,那么部署RabbitMQ的服務器,就要開發RabbitMQ的5672端口;關于Linux防火墻設置,可以參考【Linux進階六:【firewall-cmd】防火墻設置;(以【對外開放Tomcat】為例來演示)】;
(3)如果我們的RabbitMQ是安裝在本機的話,就可以設置本機地址,然后其默認會使用guest用戶去登錄,所以設置用戶就可以省略;
(4)運行結果:
3.第一個消費者;
Recv類:
package helloworld;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 描述:接收消息的類:連接到RabbitMQ的服務端,然后接收消息;這個接收消息的類,會持續運行;*/ public class Recv {//我們這兒想要接收的消息,就是Send類發送到“hello”這個隊列中的消息;// 所以,在接收消息的時候,我們也要使用到“hello”這個隊列;private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2.設置RabbitMQ的地址(即RabbitMQ的服務端的地址)//這里面填寫是RabbitMQ服務端所在服務器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想連接RabbitMQ的服務端,我么還需要通過一個用戶才行;// 所以,這兒我們使用【前面我們設置的,能夠在其他服務器上訪問RabbitMQ所在服務器的,admin用戶】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:記得要放開RabbitMQ部署的服務器的,5672端口;//3.建立連接Connection connection = connectionFactory.newConnection();//4.獲得Channel信道(我們大部分的操作,都是在信道上完成的;有了信道后,我們就可以進行操作了)Channel channel = connection.createChannel();//5.聲明隊列:因為這兒想要接收的消息,就是Send類發送到“hello”這個隊列中的消息;所以,這兒聲明的隊列和Send中聲明的隊列是一樣的;//參數說明:第一個參數(queue):隊列名;// 第二個參數(durable):這個隊列是否需要持久(即,服務重啟后,這個隊列是否需要還存在;這兒我們根據自己的需求,設為了false;)//第三個參數(exclusive):這個隊列是否獨有(即,這個隊列是不是僅能給這個連接使用;這兒我們設為了false)//第四個參數(autoDelete):這個隊列是否需要自動刪除(即,在隊列沒有使用的時候,是否需要自動刪除;這兒我們設為了false)//第五個參數(arguments);channel.queueDeclare(QUEUE_NAME, false, false, false, null);//6.接收消息,并消費//參數說明:第一個參數(queue)隊列名;// 第二個參數(autoAck),是否去自動的確認收到;即,這兒接收到消息之后,是否需要通知消息發送者;//第三個參數(callback),消息收到后的處理channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(message);}});} }說明:
(1)接收消息的時候,我們不需要關閉channel和connection,這個類會一直運行,即其會一直看隊列中有沒有數據,有的話就拿過來消費;
(2)看注釋;
(3)我們接收到消息后,具體消息怎么處理,寫在了handleDelivery()方法中;
(4)運行結果;
4.瞅一眼RabbitMQ管理后臺;
二:?根據消息內容的不同,采取不同的處理策略;(這兒演示的是一種思路)
在實際開發中,有時可能需要根據消息內容的不同,采取不同的處理策略;本篇博客,就來演示一下;
NewTask類:
package workqueues;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 描述:發送消息的類:其會發送*/ public class NewTask {//我們發送消息時,需要指定要發到哪里去;所以,我們要指定隊列的名字;所以,這兒我們定義隊列的名字;//這個名字可隨便取,待會在接收的消息時候,要使用這個隊列;private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2.設置RabbitMQ的地址(即RabbitMQ的服務端的地址)//這里面填寫是RabbitMQ服務端所在服務器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想連接RabbitMQ的服務端,我么還需要通過一個用戶才行;// 所以,這兒我們使用【前面我們設置的,能夠在其他服務器上訪問RabbitMQ所在服務器的,admin用戶】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:記得要放開RabbitMQ部署的服務器的,5672端口;//3.建立連接Connection connection = connectionFactory.newConnection();//4.獲得Channel信道(我們大部分的操作,都是在信道上完成的;有了信道后,我們就可以進行操作了)Channel channel = connection.createChannel();//5.聲明隊列(有了隊列之后,我們就可以發布消息了)//參數說明:第一個參數(queue):隊列名;// 第二個參數(durable):這個隊列是否需要持久(即,服務重啟后,這個隊列是否需要還存在;這兒我們根據自己的需求,設為了true;)//第三個參數(exclusive):這個隊列是否獨有(即,這個隊列是不是僅能給這個連接使用;這兒我們設為了false)//第四個參數(autoDelete):這個隊列是否需要自動刪除(即,在隊列沒有使用的時候,是否需要自動刪除;這兒我們設為了false)//第五個參數(arguments);channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//6.發布消息;這兒我們模擬發送10條消息(消息內容是,如"1...","2...","3..."……這樣的)for (int i = 0; i < 10; i++) {String message = i + "...";//參數說明:第一個參數(exchange)是交換機,這兒我們暫時不深入了解;// 第二個參數(routingKey)是路由鍵,這兒我們就寫成隊列的名字;//第三個參數(props),消息除了消息體外,還要有props作為它的配置;// 第四個參數(body)消息的內容,要求是byte[]類型的,同時,需要指定編碼類型channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("消息發送成功了:" + message);}//7.關閉連接:先關閉channel信道,然后關閉connection連接;channel.close();connection.close();} }說明:
(1)看注釋;
(2)類內容說明;
Worker類:
package workqueues;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 描述:接收消息的類:連接到RabbitMQ的服務端,然后接收消息;這個接收消息的類,會持續運行;*/ public class Worker {//我們這兒想要接收的消息,就是NewTask類發送到“task_queue”這個隊列中的消息;// 所以,在接收消息的時候,我們也要使用到“task_queue”這個隊列;private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2.設置RabbitMQ的地址(即RabbitMQ的服務端的地址)//這里面填寫是RabbitMQ服務端所在服務器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想連接RabbitMQ的服務端,我么還需要通過一個用戶才行;// 所以,這兒我們使用【前面我們設置的,能夠在其他服務器上訪問RabbitMQ所在服務器的,admin用戶】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:記得要放開RabbitMQ部署的服務器的,5672端口;//3.建立連接Connection connection = connectionFactory.newConnection();//4.獲得Channel信道(我們大部分的操作,都是在信道上完成的;有了信道后,我們就可以進行操作了)Channel channel = connection.createChannel();//5.聲明隊列:因為這兒想要接收的消息,就是Send類發送到“hello”這個隊列中的消息;所以,這兒聲明的隊列和Send中聲明的隊列是一樣的;//參數說明:第一個參數(queue):隊列名;// 第二個參數(durable):這個隊列是否需要持久(即,服務重啟后,這個隊列是否需要還存在;這兒我們根據自己的需求,設為了true;)//第三個參數(exclusive):這個隊列是否獨有(即,這個隊列是不是僅能給這個連接使用;這兒我們設為了false)//第四個參數(autoDelete):這個隊列是否需要自動刪除(即,在隊列沒有使用的時候,是否需要自動刪除;這兒我們設為了false)//第五個參數(arguments);channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("開始接收消息");//6.接收消息,并消費//參數說明:第一個參數(queue)隊列名;// 第二個參數(autoAck),是否去自動的確認收到;即,這兒接收到消息之后,是否需要通知消息發送者;//第三個參數(callback),消息收到后的處理channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到了消息" + message);try{doWork(message);}finally {System.out.println("完成消息處理");}}});}/*** 工具方法:處理消息;* @param task*/private static void doWork(String task) {//根據具體消息內容的不同,去處理消息;// 即,如果消息中有'.'的話,那么我們就讓其處理速度慢1秒;(PS:這兒僅僅是為了演示用的,玩具式程序)// 那么,這樣一來,就會出現這個效果:如果消息中沒有'.',處理的就會很快;如果有'.',處理速度就會慢的多;char[] chars = task.toCharArray();for (char ch : chars) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}} }說明:
(1)看注釋;
(2)類內容說明;
(3)運行結果;
三:當消息相對較多時,多個消費者平均壓力;?
可以看到,在(二:?根據消息內容的不同,采取不同的處理策略;(這兒演示的是一種思路))中,單靠Worker處理10條信息,需要10秒多;試想,我們再創建一個Worker,10條消息的處理速度就會提升;
1.多個消費者,平均壓力:引入;?
一個直接的想法就是,既然我們想讓多個Worker一起來接收并處理消息,那么我們可不可以再運行一次Worker類?
進行一下配置,讓Worker類可以有多個實例并行運行;
那么,此時的效果;
默認情況下,如果有多個Worker的話,那么這多個Worker會并行工作;RabbitMQ會根據已啟動worker和消息的情況,按順序把每個消息發送給下一個Worker,在消息數量上是平均分配的;即,比如上面有10條消息,兩個消費者,那么無論兩個消費者處理能力如何,每個消費者都會收到5條消息;
但是,這種平均,是任務量的平均分配,而不一定是真實工作量(壓力)的分配;比如,下面的案例;
此時,再發送消息,觀察效果:
而,為了解決這種,純按數量和順序分配,卻沒有按工作量(壓力)平均分配的問題;就是下面公平派遣的內容了;
2.公平派遣;
公平派遣是在有多個消費者,而且是循環調度的情況下,來說的;公平派遣機制下,RabbitMQ會根據消費者的壓力,來決定是否派遣;
要想實現公平派遣,還需要加入消息確認機制;主要目的是,消費者處理完消息后,發送一個確認消息,這樣一來,RabbitMQ就知道你處理完了,然后就會給你發送下一個消息;
修改Worker這個Consumer類的內容如下:
package workqueues;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 描述:接收消息的類:連接到RabbitMQ的服務端,然后接收消息;這個接收消息的類,會持續運行;*/ public class Worker {//我們這兒想要接收的消息,就是NewTask類發送到“task_queue”這個隊列中的消息;// 所以,在接收消息的時候,我們也要使用到“task_queue”這個隊列;private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2.設置RabbitMQ的地址(即RabbitMQ的服務端的地址)//這里面填寫是RabbitMQ服務端所在服務器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想連接RabbitMQ的服務端,我么還需要通過一個用戶才行;// 所以,這兒我們使用【前面我們設置的,能夠在其他服務器上訪問RabbitMQ所在服務器的,admin用戶】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:記得要放開RabbitMQ部署的服務器的,5672端口;//3.建立連接Connection connection = connectionFactory.newConnection();//4.獲得Channel信道(我們大部分的操作,都是在信道上完成的;有了信道后,我們就可以進行操作了)final Channel channel = connection.createChannel();//5.聲明隊列:因為這兒想要接收的消息,就是Send類發送到“hello”這個隊列中的消息;所以,這兒聲明的隊列和Send中聲明的隊列是一樣的;//參數說明:第一個參數(queue):隊列名;// 第二個參數(durable):這個隊列是否需要持久(即,服務重啟后,這個隊列是否需要還存在;這兒我們根據自己的需求,設為了true;)//第三個參數(exclusive):這個隊列是否獨有(即,這個隊列是不是僅能給這個連接使用;這兒我們設為了false)//第四個參數(autoDelete):這個隊列是否需要自動刪除(即,在隊列沒有使用的時候,是否需要自動刪除;這兒我們設為了false)//第五個參數(arguments);channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("開始接收消息");//這句話的意思,這個消費者最希望處理的消息的數量;那么效果就是,這個消費者在處理完一個消息之前,是不會接收下一個消息的;channel.basicQos(1);//6.接收消息,并消費//參數說明:第一個參數(queue)隊列名;// 第二個參數(autoAck),是否去自動的確認收到;即,這兒接收到消息之后,是否需要通知消息發送者;//為了演示公平派遣,我們這兒改成了false,即我么手動卻發送處理了完成的消息//第三個參數(callback),消息收到后的處理channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到了消息" + message);try{doWork(message);}finally {System.out.println("完成消息處理");//消息處理完成后,去手動確認;//第一個參數(deliveryTag)這個參數是固定的;//第二參數(multiple)意思是,我們是否同時多個消息一起確認,這兒我們不需要,所以設為了false;channel.basicAck(envelope.getDeliveryTag(), false);}}});}/*** 工具方法:處理消息;* @param task*/private static void doWork(String task) {//根據具體消息內容的不同,去處理消息;// 即,如果消息中有'.'的話,那么我們就讓其處理速度慢1秒;(PS:這兒僅僅是為了演示用的,玩具式程序)// 那么,這樣一來,就會出現這個效果:如果消息中沒有'.',處理的就會很快;如果有'.',處理速度就會慢的多;char[] chars = task.toCharArray();for (char ch : chars) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}} }說明:
(1)修改內容說明;
(2)注意:此時,一定要記得主動發送【消息處理完成的通知】;否則,RabbitMQ就不知道,這個消息是否被處理完了,其就會認為沒有被處理完,于是后續的消息就會得不到處理,越積越多,消耗內存;
(3)運行效果;此時,我們重開兩個Worker,然后運行NewTask,發送10條消息;
?
總結
以上是生活随笔為你收集整理的RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分享代码
- 下一篇: 怎么在html中加校验,如何通过W3C验