RabbitMQ初步学习(Mac)
1.RabbitMQ學習:
1.簡介
2.安裝
3.使用
- 3.1.創(chuàng)建簡單列隊
- 3.2.創(chuàng)建工作列隊
- 3.3.創(chuàng)建訂閱列隊
- 3.4.創(chuàng)建路由列隊
- 3.5.創(chuàng)建主題列隊
- 3.6.事務
- 3.7.確認模式
- 3.7.1.同步確認
- 3.7.2.異步確認
- 使用springBoot 簡單的實現(xiàn)AMQP
使用springBoot實現(xiàn)AMQP更多模式
2.MQ簡介:
? 在計算機科學中,消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的序列用來處理一系列的輸入,通常是來自用戶的。消息隊列提供了異步的通信協(xié)議,每一個序列中的記錄包含了詳細說明的數據,包含發(fā)生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發(fā)送者和接收者不需要同時與消息隊列交互。消息保存在隊列中,直到接收者取回它。
2.1、實現(xiàn):
消息隊列常常保存在鏈表結構中。擁有權限的進程才可以向消息隊列中寫入或讀取消息
目前,有很多消息隊列有很多的實現(xiàn),包括 JBoss Messing、JORAM、Apache、ActiveMQ、SunPoen Message Queue、IBM MQ、Apache Qpid和HTTPSQS
當前使用較多的消息隊列有:RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等而部分數據庫如:Redis,Mysql,以及phxsql也可以實現(xiàn)消息隊列的功能。
2.2、特點:
MQ是消費者-生產者模型中的一個典型代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ和JMS類似,但不同的是JMS是SUN JAVA消息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協(xié)議的具體實現(xiàn)和產品。
注意:
1.AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與中間件可傳遞消息,并不受客戶端/中間件影響,不同的開發(fā)語言等條件的限制。
2.AMS,即java消息服務(java Message Service)應用程序接口,是一個java平臺中關于面向消息中間件的API,用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供服務。常見的消息列隊,大部分都實現(xiàn)了JMI API 如:ActiveMQ,Redis以及RabbitMQ 等
2.3、優(yōu)缺點:
優(yōu)點:
應用耦合、異步處理、流量削峰
-
解耦:
傳統(tǒng)模式
傳統(tǒng)模式缺點:
系統(tǒng)間耦合性太強,如上圖所示,系統(tǒng)A在代碼中直接調用系統(tǒng)B和系統(tǒng)C的代碼,如果將D系統(tǒng)接入,系統(tǒng)A還需要修改代碼,太麻煩!
中間件模式:
中間件模式優(yōu)點:
將消息寫入列隊,需要消息的系統(tǒng)自己從消息列隊中訂閱,從而系統(tǒng)A不需要做如何修改。
-
異步
傳統(tǒng)模式:
傳統(tǒng)模式缺點:
? 一些非必要的業(yè)務邏輯以同步的方式運行,太耗費時間。
中間件模式:
中間件模式優(yōu)點:
? 使用消息隊列發(fā)送消息,減少耗時。
-
削峰
傳統(tǒng)模式:
傳統(tǒng)模式缺點:
并發(fā)量大的時候,所有的請求直接懟到數據庫,造成數據庫連接異常。
中間件模式:
中間件模式的優(yōu)點:系統(tǒng)A慢慢的按照數據庫能處理的并發(fā)量,從消息隊列中慢慢拉取消息。在生產中,這個短暫的最高峰期積壓是允許的。
缺點:
系統(tǒng)可用性低、系統(tǒng)復雜性增加
2.4、使用場景:
? 消息列隊,是分布式系統(tǒng)中重要的組件,其通用的使用場景可以簡單的描述為:當不需要立即獲取結果,但是并發(fā)量又需要進行控制的時候,差不多就是需要使用消息列隊的時候。
? 在項目中,將一些無需及時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節(jié)省了服務器請求的響應時間,從而提高了系統(tǒng)的吞吐量。
2.5、為什么使用RabbitMQ:
? AMQP,即Advanced Meassage Queueing Protocol,高級消息列隊協(xié)議,是應用層的一個開發(fā)標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息的使用者的存在,反之亦然。
? AMQP的主要特征是面向消息、列隊、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。
? RabbitMQ是一個開源的AMQP實現(xiàn),服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于分布式系統(tǒng)中存儲轉發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
? 總結如下
3.安裝:
3.1、安裝erlang:(Mac系統(tǒng))
brew install erlang
erlang對應Rabbit版本:
測試erlang是否安裝成功:
3.2、安裝rabbitMQ:(Mac系統(tǒng))
官網下載地址:https://www.rabbitmq.com/download.html
開啟RabbitMQ圖形化管理界面插件:rabbitmq-plugins enable rabbitmq_management、關閉RabbitMQ圖形化管理界面插件:rabbitmq-plugins disable rabbitmq_management
- 使用rabbitmq-plugins list指令查看 rabbitmq 的插件啟動情況:
開啟RabbitMQ服務rabbitmq-service、關閉RabbitMq服務rabbitmqctl stop
在瀏覽器訪問localhost:15672進入rabbitmq圖形界面管理登陸系統(tǒng):
默認用戶名:guest ,默認密碼: guest
登陸之后進入rabbitmq圖形界面管理系統(tǒng):
4.使用RabbitMQ:
4.1、添加一個名稱為/web的虛擬主機:
創(chuàng)建成功:
每次創(chuàng)建虛擬主機guest用戶會默認加入虛擬主機
4.2、添加一個名稱為web的用戶:
添加成功:
4.3、將web用戶添加到虛擬主機:
添加成功:
4.4、使用java代碼實現(xiàn)AMQP:
導入依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.4.3</version></dependency>4.4.1、創(chuàng)建簡單列隊:
簡單列隊:生產者將消息發(fā)送到“hello”隊列。消費者從該隊列接收消息。
4.4.1.1:創(chuàng)建簡單列隊生產者:
/*** 簡單隊列生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");//端口號connectionFactory.setPort(5672);//用戶名connectionFactory.setUsername("web");//用戶密碼connectionFactory.setPassword("web");//虛擬主機名connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 第一個參數(queue)綁定隊列* 第二個參數(durable)持久化* 第三個參數(Exclusive)排他隊列* 第四個參數(Auto-delete)自動刪除*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(message);}} }啟動生產者服務:
消息堵塞:
4.4.1.2:創(chuàng)建簡單列隊消費者:
/*** 普通隊列消費者* @author haoruijie*/ public class Recv {//定義隊列名稱private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");//端口號connectionFactory.setPort(5672);//用戶名connectionFactory.setUsername("web");//用戶密碼connectionFactory.setPassword("web");//虛擬主機名connectionFactory.setVirtualHost("/web");//創(chuàng)建信道Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});} }啟動消費者:
消費者消費消息:
4.4.2:創(chuàng)建工作列隊
工作列隊:(一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!)
4.4.2.1:創(chuàng)建工作列隊-生產者:
/*** 工作隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "work_fair";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息for (int i = 0; i < 20; i++) {channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes(StandardCharsets.UTF_8));System.out.println(message+i);}}} }啟動生產者:
消息堵塞:
4.4.2.2:創(chuàng)建工作列隊-消費者01
/*** 工作隊列-公平-消費者* @author haoruijie*/ public class Recv01 {//定義隊列名稱private static final String QUEUE_NAME = "work_fair";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//限制消費1條消息,消費完在繼續(xù)消費下一天消息(限流)int prefetchCount = 1;channel.basicQos(prefetchCount);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);//消費者01接收一條消息后休眠10毫秒try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag->{});} }啟動消費者:每個消費者每隔10 毫秒取一條消息
消費者01:成功取得列隊消息->
消費者02:成功取得列隊消息->
(消費者02代碼和消費者01一樣)
4.4.3:發(fā)布訂閱列隊
一個生產者將消息首先發(fā)送到交換器,交換器綁定到多個隊列,然后被監(jiān)聽該隊列的消費者所接收并消費。
4.4.3.1:創(chuàng)建發(fā)布列隊-生產者:
/*** 發(fā)布/訂閱隊列-生產者*/ public class Send {//定義發(fā)布隊列名稱private static final String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,fanout就是廣播 (只要)*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));System.out.println(message);}} }啟動成功:成功將消息綁定到交換機上
4.4.3.2:創(chuàng)建發(fā)布列隊-消費者:
/*** 發(fā)布/訂閱隊列-消費者* @author haoruijie*/ public class Recv01 {//定義訂閱隊列名稱private static final String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定channel.queueBind(queue,EXCHANGE_NAME,"");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }啟動訂閱消費者:所有訂閱消費者都可以獲得消息。
4.4.4:路由列隊:
生產者將消息發(fā)送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發(fā)送的消息會指定一個路由key,那么消息只會發(fā)送到相應key相同的隊列,接著監(jiān)聽該隊列的消費者消費消息。
也就是讓消費者有選擇性的接收消息。
4.4.4.1:創(chuàng)建路由列隊-生產者:
/*** 路由隊列-生產者*/ public class Send {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,dorect是路由*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//準備消息String message = "Hello world";/*** 發(fā)送消息* 交換機名* 交換機key*/channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"green",null,message.getBytes(StandardCharsets.UTF_8));}} }啟動路由生產者:
會將消息發(fā)送到名為EXCHANGE_NAME的交換機中,分別將消息key設置為orange和green
4.4.4.2:創(chuàng)建路由列隊-消費者:
/*** 路由隊列-消費者*/ public class Secv01 {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定channel.queueBind(queue,EXCHANGE_NAME,"black");channel.queueBind(queue,EXCHANGE_NAME,"green");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }啟動消費者:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-HXBlxAVB-1634903973672)(/Users/haoruijie/Library/Application Support/typora-user-images/image-20211022135436669.png)]
消費者01會取到交換機名為EXCHANGE_NAME,key值為green和orange 而消費者02只會收到key值為orange的消息
4.4.5:主題路由列隊:(使用最多的模式,通過模糊匹配,使得操作更加自如)
通過通配符模式來判斷路由key通俗的來講就是模糊匹配。
*.匹配一個字符 #.匹配所有字符
4.4.5.1:創(chuàng)建主題路由列隊-生產者:
/*** 主題-路由隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,TOPIC主題路由列隊*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//準備消息String message1 = "Hello world1";String message2 = "Hello world2";String message3 = "Hello world3";//設置交換機key值String routingKey1 = "quick.orange.rabbit";String routingKey2 = "lazy.pink.rabbit";String routingKey3 = "quick.hello.male";//發(fā)送消息channel.basicPublish(EXCHANGE_NAME,routingKey1,null,message1.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,routingKey2,null,message2.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,routingKey3,null,message3.getBytes(StandardCharsets.UTF_8));}} }啟動路由生產者:
會將消息發(fā)送到名為EXCHANGE_NAME的交換機中,分別將消息key設置為routingKey1、routingKey2和routingKey3
4.4.5.2:創(chuàng)建主題路由列隊-消費者:
/*** 主題-路由隊列-消費者* @author haoruijie*/ public class Recv01 {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定綁定keychannel.queueBind(queue,EXCHANGE_NAME,"*.orange.*");channel.queueBind(queue,EXCHANGE_NAME,"lazy.#");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }啟動消費者
運行結果:
消費者01只會匹配routingKey1和routingKey2的消息
4.5、事務:
使用事務會大幅度降低性能 (一般不會使用) 開啟事務會知道生產者是否將消息成功提交到列隊里
4.5.1創(chuàng)建事務列隊:
4.5.1.1:創(chuàng)建事務列隊-生產者:
/*** 事務隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "tx";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {connection = connectionFactory.newConnection();channel = connection.createChannel();//開啟事務channel.txSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//制造異常(遇到異常事務回滾)int a = 1/0;//提交事務channel.txCommit();}catch (Exception e){//事務回滾channel.txRollback();e.printStackTrace();}finally {//關閉連接if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }4.5.1.2:創(chuàng)建事務列隊-消費者:
/*** 事務隊列消費者* @author haoruijie*/ public class Recv {//定義隊列名稱private static final String QUEUE_NAME = "tx";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("yeb");connectionFactory.setPassword("yeb");connectionFactory.setVirtualHost("/yeb");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});} }啟動消費者:
啟動生產者: 發(fā)現(xiàn)異常事務回調 消費者沒有消息消費
4.6、確認模式:
(確認生產者是否把消息發(fā)送到了服務器)
4.6.1:同步確認:
(同步確認會影響性能一般不會使用)
4.6.1.1:創(chuàng)建同步-確認-生產者:
/*** 確認-同步-生產者 (會影響性能)* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "sync";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {connection = connectionFactory.newConnection();channel = connection.createChannel();//啟動確認模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//普通確認,只能單條確認if(channel.waitForConfirms()){System.out.println("確認成功!");}//普通批量確認 ,如果有一條不成功就會拋異常,全部成功不會拋異常//channel.waitForConfirmsOrDie();}catch (Exception e){e.printStackTrace();}finally {if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }啟動生產者:
發(fā)送消息成功:
4.6.2:異步確認:
(異步確認效率是最高的)
4.6.2.1:創(chuàng)建異步-確認-生產者:
/*** 確認-異步-生產者 (效率最高)*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "async";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("yeb");connectionFactory.setPassword("yeb");connectionFactory.setVirtualHost("/yeb");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {final SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<Long>());connection = connectionFactory.newConnection();channel = connection.createChannel();//啟動確認模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//天際channel監(jiān)聽channel.addConfirmListener(new ConfirmListener() {//已確認@Overridepublic void handleAck(long l, boolean b) throws IOException {//b為true確認多條成功 為false確認單條成功if (b){System.out.println("確認多條成功");set.headSet(l+1L).clear();}else {System.out.println("確認單條成功"+l);set.remove(l);}}//未確認@Overridepublic void handleNack(long l, boolean b) throws IOException {//b為true多條未確認 為false單條未確認if (b){System.out.println("多條未確認");set.headSet(l+1L).clear();}else {System.out.println("單體未確認"+l);set.remove(l);}}});int i =0;while (i<20){i++;//準備消息String message = "Hello world"+i;Long seqNo = channel.getNextPublishSeqNo();//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));set.add(seqNo);System.out.println("[x] Sent'"+message+"'");}}catch (Exception e){e.printStackTrace();}finally {if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }啟動生產者:
確認發(fā)送消息成功:
消息發(fā)送成功:
4.7、使用springBoot 簡單的實現(xiàn)AMQP:
4.7.1:創(chuàng)建springboot多模塊項目:
4.7.1.1:創(chuàng)建消費者模塊、并繼承父模塊:
4.7.1.2:編寫消費者模塊配置文件:
#配置端口號 server:port: 8002 spring:#Rabbitmq生產出配置rabbitmq:#iphost: 127.0.0.1#用戶名username: guest#密碼password: guest#端口port: 56724.7.1.3:編寫消費者測試類:
@Component public class Test {@RabbitListener(queues = "hello")public void demo(String hello){System.out.println(hello);} }4.7.1.4:編寫queue配置文件:
@Configuration public class RabbitmqConfig {@Beanpublic Queue queue(){return new Queue("hello");} }4.7.1.4:啟動消費者服務:
啟動成功hello列隊已存在
4.7.2.1:創(chuàng)建生產者模塊、繼承父模塊:
4.7.2.2:編寫生產者配置文件:
#配置端口號 server:port: 8001spring:#Rabbitmq生產出配置rabbitmq:#iphost: 127.0.0.1#用戶名username: web#密碼password: web#端口port: 56724.7.2.3:編寫生產者測試類:
@Component public class Test {@Autowiredprivate RabbitTemplate template;public void demo(){template.convertAndSend("hello","hello world");} }4.7.2.4:發(fā)送消息:
發(fā)送被消費者立即消費
總結
以上是生活随笔為你收集整理的RabbitMQ初步学习(Mac)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html设置tr无上边框,【html】设
- 下一篇: html5的交互式微课,内嵌交互式微课的