简单消息模型
五種消息模型
RabbitMQ提供了6種消息模型,但是第6種其實是RPC,并不是MQ,因此不予學習。那么也就剩下5種。
但是其實3、4、5這三種都屬于訂閱模型,只不過進行路由的方式不同。
?
我們通過一個demo工程來了解下RabbitMQ的工作方式:
導入工程:
依賴:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.learn.rabbitmq</groupId><artifactId>learn-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.2.RELEASE</version></parent><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies> </project>我們抽取一個建立RabbitMQ連接的工具類,方便其他程序獲取連接:
public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("192.168.56.101");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/learn");factory.setUsername("learn");factory.setPassword("learn");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;} }基本消息模型
官方介紹:
RabbitMQ是一個消息代理:它接受和轉發消息。 你可以把它想象成一個郵局:當你把郵件放在郵箱里時,你可以確定郵差先生最終會把郵件發送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,存儲和轉發數據消息的二進制數據塊。
P(producer/ publisher):生產者,一個發送消息的用戶應用程序。
C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收消息的用戶應用程序
隊列(紅色區域):rabbitmq內部類似于郵箱的一個概念。雖然消息流經rabbitmq和你的應用程序,但是它們只能存儲在隊列中。隊列只受主機的內存和磁盤限制,實質上是一個大的消息緩沖區。許多生產者可以發送消息到一個隊列,許多消費者可以嘗試從一個隊列接收數據。
總之:
生產者將消息發送到隊列,消費者從隊列中獲取消息,隊列是存儲消息的緩沖區。
?
我們將用Java編寫兩個程序;發送單個消息的生產者,以及接收消息并將其打印出來的消費者。我們將詳細介紹Java API中的一些細節,這是一個消息傳遞的“Hello World”。
我們將調用我們的消息發布者(發送者)Send和我們的消息消費者(接收者)Recv。發布者將連接到RabbitMQ,發送一條消息,然后退出。
生產者發送消息
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();// 從連接中創建通道,這是完成大部分API的地方。Channel channel = connection.createChannel();// 聲明(創建)隊列,必須聲明隊列才能夠發送消息,我們可以把消息發送到隊列中。// 聲明一個隊列是冪等的 - 只有當它不存在時才會被創建channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息內容String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關閉通道和連接channel.close();connection.close();} }控制臺:
管理工具中查看消息
進入隊列頁面,可以看到新建了一個隊列:simple_queue
點擊隊列名稱,進入詳情頁,可以查看消息:
在控制臺查看消息并不會將消息消費,所以消息還在。
消費者獲取消息
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 創建通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 監聽隊列,第二個參數:是否自動進行消息確認。channel.basicConsume(QUEUE_NAME, true, consumer);} }控制臺:
這個時候,隊列中的消息就沒了:
我們發現,消費者已經獲取了消息,但是程序沒有停止,一直在監聽隊列中是否有新的消息。一旦有新的消息進入隊列,就會立即打印.
總結