学成在线--11.RabbitMQ快速入门
文章目錄
- 一.RabbitMQ簡(jiǎn)介
- 二.相關(guān)知識(shí)
- 1.AMQP
- 2.JMS是什么 ?
- 三.RabbitMQ的工作原理
- 四.Hello World
- 1.創(chuàng)建Maven工程
- 2.生產(chǎn)者
- 3.消費(fèi)者
- 五.總結(jié)
一.RabbitMQ簡(jiǎn)介
MQ全稱(chēng)為Message Queue,即消息隊(duì)列, RabbitMQ是由erlang語(yǔ)言開(kāi)發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開(kāi)發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
開(kāi)發(fā)中消息隊(duì)列通常有如下應(yīng)用場(chǎng)景:
1、任務(wù)異步處理。
將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間。
2、應(yīng)用程序解耦合
MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過(guò)MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。
市場(chǎng)上還有哪些消息隊(duì)列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
1、簡(jiǎn)單,功能強(qiáng)大。
2、基于AMQP協(xié)議。
3、社區(qū)活躍,文檔完善。
4、高并發(fā)性能好,這主要得益于Erlang語(yǔ)言。
5、Spring Boot默認(rèn)已集成RabbitMQ
二.相關(guān)知識(shí)
1.AMQP
總結(jié):AMQP是一套公開(kāi)的消息隊(duì)列協(xié)議,最早在2003年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標(biāo)準(zhǔn)格式,為的就是解決MQ市場(chǎng)上協(xié)議不統(tǒng)一的問(wèn)題。RabbitMQ就是遵循AMQP標(biāo)準(zhǔn)協(xié)議開(kāi)發(fā)的MQ服務(wù)。
百度鏈接:AMQP
官方:官網(wǎng)
2.JMS是什么 ?
總結(jié):JMS是java提供的一套消息服務(wù)API標(biāo)準(zhǔn),其目的是為所有的java應(yīng)用程序提供統(tǒng)一的消息通信的標(biāo)準(zhǔn),類(lèi)似java的jdbc,只要遵循jms標(biāo)準(zhǔn)的應(yīng)用程序之間都可以進(jìn)行消息通信。
JMS和AMQP有什么 不同?
jms是java語(yǔ)言專(zhuān)屬的消息服務(wù)標(biāo)準(zhǔn),它是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用;而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是跨語(yǔ)言的。
三.RabbitMQ的工作原理
下圖是RabbitMQ的基本結(jié)構(gòu):
組成部分說(shuō)明如下:
Producer:消息生產(chǎn)者,即生產(chǎn)方客戶(hù)端,生產(chǎn)方客戶(hù)端將消息發(fā)送到MQ。
Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。
Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過(guò)慮。
Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。
Consumer:消息消費(fèi)者,即消費(fèi)方客戶(hù)端,接收MQ轉(zhuǎn)發(fā)的消息。
消息發(fā)布和接收流程:
-----發(fā)送消息-----
1、生產(chǎn)者和Broker建立TCP連接。
2、生產(chǎn)者和Broker建立通道。
3、生產(chǎn)者通過(guò)通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)
----接收消息-----
1、消費(fèi)者和Broker建立TCP連接
2、消費(fèi)者和Broker建立通道
3、消費(fèi)者監(jiān)聽(tīng)指定的Queue(隊(duì)列)
4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
5、消費(fèi)者接收到消息
四.Hello World
1.創(chuàng)建Maven工程
創(chuàng)建生產(chǎn)者工程和消費(fèi)者工程,分別加入RabbitMQ java client的依賴(lài)。
test-rabbitmq-producer:生產(chǎn)者工程
test-rabbitmq-consumer:消費(fèi)者工程
2.生產(chǎn)者
在生產(chǎn)者工程下的test中創(chuàng)建測(cè)試類(lèi)如下:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer01 {//隊(duì)列private static final String QUEUE = "helloworld";public static void main(String[] args) {//通過(guò)連接工廠(chǎng)創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話(huà)通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱(chēng)* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪(fǎng)問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE,true,false,false,null);//發(fā)送消息//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數(shù)明細(xì):* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來(lái)將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱(chēng)* 3、props,消息的屬性* 4、body,消息內(nèi)容*///消息內(nèi)容String message = "hello world 黑馬程序員";channel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("send to mq "+message);} catch (Exception e) {e.printStackTrace();} finally {//關(guān)閉連接//先關(guān)閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }3.消費(fèi)者
在消費(fèi)者工程下的test中創(chuàng)建測(cè)試類(lèi)如下:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 入門(mén)程序消費(fèi)者* @author Administrator* @version 1.0* @create 2018-06-17 9:25**/ public class Consumer01 {//隊(duì)列private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠(chǎng)創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話(huà)通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();//監(jiān)聽(tīng)隊(duì)列//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱(chēng)* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪(fǎng)問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE,true,false,false,null);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱(chēng)* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);} }五.總結(jié)
1、發(fā)送端操作流程
1)創(chuàng)建連接connection
2)創(chuàng)建通道channel
3)聲明隊(duì)列channel.queueDeclare
4)發(fā)送消息channel.basicPublish
2、接收端
1)創(chuàng)建連接
2)創(chuàng)建通道
3)聲明隊(duì)列
4)監(jiān)聽(tīng)隊(duì)列
5)接收消息
6)ack回復(fù)
總結(jié)
以上是生活随笔為你收集整理的学成在线--11.RabbitMQ快速入门的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2018双一流排名 计算机,2018中国
- 下一篇: 如何避免下重复订单