日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ初步学习(Mac)

發(fā)布時間:2024/3/26 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ初步学习(Mac) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.RabbitMQ學習:

1.簡介
2.安裝
3.使用

  • 3.1.創(chuàng)建簡單列隊
  • 3.2.創(chuàng)建工作列隊
  • 3.3.創(chuàng)建訂閱列隊
  • 3.4.創(chuàng)建路由列隊
  • 3.5.創(chuàng)建主題列隊
  • 3.6.事務
  • 3.7.確認模式
    • 3.7.1.同步確認
    • 3.7.2.異步確認
  • 使用springBoot 簡單的實現(xiàn)AMQP
    使用springBoot實現(xiàn)AMQP更多模式



2.MQ簡介:

? 在計算機科學中,消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的序列用來處理一系列的輸入,通常是來自用戶的。消息隊列提供了異步的通信協(xié)議,每一個序列中的記錄包含了詳細說明的數據,包含發(fā)生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發(fā)送者和接收者不需要同時與消息隊列交互。消息保存在隊列中,直到接收者取回它。

2.1、實現(xiàn):

消息隊列常常保存在鏈表結構中。擁有權限的進程才可以向消息隊列中寫入或讀取消息

目前,有很多消息隊列有很多的實現(xiàn),包括 JBoss Messing、JORAM、Apache、ActiveMQ、SunPoen Message Queue、IBM MQ、Apache Qpid和HTTPSQS

當前使用較多的消息隊列有:RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等而部分數據庫如:Redis,Mysql,以及phxsql也可以實現(xiàn)消息隊列的功能。

2.2、特點:

MQ是消費者-生產者模型中的一個典型代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ和JMS類似,但不同的是JMS是SUN JAVA消息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協(xié)議的具體實現(xiàn)和產品。

注意:

1.AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與中間件可傳遞消息,并不受客戶端/中間件影響,不同的開發(fā)語言等條件的限制。

2.AMS,即java消息服務(java Message Service)應用程序接口,是一個java平臺中關于面向消息中間件的API,用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供服務。常見的消息列隊,大部分都實現(xiàn)了JMI API 如:ActiveMQ,Redis以及RabbitMQ 等

2.3、優(yōu)缺點:

優(yōu)點:

應用耦合、異步處理、流量削峰

  • 解耦:

    傳統(tǒng)模式

傳統(tǒng)模式缺點:

系統(tǒng)間耦合性太強,如上圖所示,系統(tǒng)A在代碼中直接調用系統(tǒng)B和系統(tǒng)C的代碼,如果將D系統(tǒng)接入,系統(tǒng)A還需要修改代碼,太麻煩

中間件模式:

中間件模式優(yōu)點:

將消息寫入列隊,需要消息的系統(tǒng)自己從消息列隊中訂閱,從而系統(tǒng)A不需要做如何修改

  • 異步

    傳統(tǒng)模式:

    傳統(tǒng)模式缺點:

    ? 一些非必要的業(yè)務邏輯以同步的方式運行,太耗費時間

    中間件模式:

中間件模式優(yōu)點:

? 使用消息隊列發(fā)送消息,減少耗時

  • 削峰

    傳統(tǒng)模式:

傳統(tǒng)模式缺點:

并發(fā)量大的時候,所有的請求直接懟到數據庫,造成數據庫連接異常

中間件模式:

中間件模式的優(yōu)點:

系統(tǒng)A慢慢的按照數據庫能處理的并發(fā)量,從消息隊列中慢慢拉取消息。在生產中,這個短暫的最高峰期積壓是允許的。

缺點:

系統(tǒng)可用性低、系統(tǒng)復雜性增加

2.4、使用場景:

? 消息列隊,是分布式系統(tǒng)中重要的組件,其通用的使用場景可以簡單的描述為:當不需要立即獲取結果,但是并發(fā)量又需要進行控制的時候,差不多就是需要使用消息列隊的時候。

? 在項目中,將一些無需及時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節(jié)省了服務器請求的響應時間,從而提高了系統(tǒng)的吞吐量。

2.5、為什么使用RabbitMQ:

? AMQP,即Advanced Meassage Queueing Protocol,高級消息列隊協(xié)議,是應用層的一個開發(fā)標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息的使用者的存在,反之亦然。

? AMQP的主要特征是面向消息、列隊、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。

? RabbitMQ是一個開源的AMQP實現(xiàn),服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于分布式系統(tǒng)中存儲轉發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。

? 總結如下

  • 基于AMQP協(xié)議
  • 高并發(fā)(服務器可以接受最大任務數量)
  • 高性能(單位時間內服務器可以處理的任務數)
  • 高可用(單位時間內的服務器可以正常工作的時間比例)
  • 強大的社區(qū)支持,以及很多公司都在使用
  • 支持插件
  • 支持多語言
  • 3.安裝:

    3.1、安裝erlang:(Mac系統(tǒng))

    brew install erlang

    erlang對應Rabbit版本:


    測試erlang是否安裝成功:

    3.2、安裝rabbitMQ:(Mac系統(tǒng))

    官網下載地址:https://www.rabbitmq.com/download.html

    開啟RabbitMQ圖形化管理界面插件:rabbitmq-plugins enable rabbitmq_management關閉RabbitMQ圖形化管理界面插件:rabbitmq-plugins disable rabbitmq_management

    • 使用rabbitmq-plugins list指令查看 rabbitmq 的插件啟動情況:

    開啟RabbitMQ服務rabbitmq-service、關閉RabbitMq服務rabbitmqctl stop

    在瀏覽器訪問localhost:15672進入rabbitmq圖形界面管理登陸系統(tǒng):

    默認用戶名:guest ,默認密碼: guest

    登陸之后進入rabbitmq圖形界面管理系統(tǒng):


    4.使用RabbitMQ:

    4.1、添加一個名稱為/web的虛擬主機:


    創(chuàng)建成功:

    每次創(chuàng)建虛擬主機guest用戶會默認加入虛擬主機

    4.2、添加一個名稱為web的用戶:

    添加成功:

    4.3、將web用戶添加到虛擬主機:

    添加成功:

    4.4、使用java代碼實現(xiàn)AMQP:

    導入依賴

    <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.4.3</version></dependency>

    4.4.1、創(chuàng)建簡單列隊:

    簡單列隊:生產者將消息發(fā)送到“hello”隊列。消費者從該隊列接收消息。

    4.4.1.1:創(chuàng)建簡單列隊生產者:
    /*** 簡單隊列生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");//端口號connectionFactory.setPort(5672);//用戶名connectionFactory.setUsername("web");//用戶密碼connectionFactory.setPassword("web");//虛擬主機名connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 第一個參數(queue)綁定隊列* 第二個參數(durable)持久化* 第三個參數(Exclusive)排他隊列* 第四個參數(Auto-delete)自動刪除*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(message);}} }

    啟動生產者服務:

    消息堵塞:

    4.4.1.2:創(chuàng)建簡單列隊消費者:
    /*** 普通隊列消費者* @author haoruijie*/ public class Recv {//定義隊列名稱private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");//端口號connectionFactory.setPort(5672);//用戶名connectionFactory.setUsername("web");//用戶密碼connectionFactory.setPassword("web");//虛擬主機名connectionFactory.setVirtualHost("/web");//創(chuàng)建信道Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});} }

    啟動消費者:

    消費者消費消息:

    4.4.2:創(chuàng)建工作列隊

    工作列隊:(一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!)

    4.4.2.1:創(chuàng)建工作列隊-生產者:
    /*** 工作隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "work_fair";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息for (int i = 0; i < 20; i++) {channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes(StandardCharsets.UTF_8));System.out.println(message+i);}}} }

    啟動生產者:

    消息堵塞:

    4.4.2.2:創(chuàng)建工作列隊-消費者01
    /*** 工作隊列-公平-消費者* @author haoruijie*/ public class Recv01 {//定義隊列名稱private static final String QUEUE_NAME = "work_fair";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//限制消費1條消息,消費完在繼續(xù)消費下一天消息(限流)int prefetchCount = 1;channel.basicQos(prefetchCount);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);//消費者01接收一條消息后休眠10毫秒try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag->{});} }

    啟動消費者:每個消費者每隔10 毫秒取一條消息

    消費者01:成功取得列隊消息->

    消費者02:成功取得列隊消息->

    (消費者02代碼和消費者01一樣)

    4.4.3:發(fā)布訂閱列隊

    一個生產者將消息首先發(fā)送到交換器,交換器綁定到多個隊列,然后被監(jiān)聽該隊列的消費者所接收并消費。

    4.4.3.1:創(chuàng)建發(fā)布列隊-生產者:
    /*** 發(fā)布/訂閱隊列-生產者*/ public class Send {//定義發(fā)布隊列名稱private static final String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,fanout就是廣播 (只要)*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));System.out.println(message);}} }

    啟動成功:成功將消息綁定到交換機上

    4.4.3.2:創(chuàng)建發(fā)布列隊-消費者:
    /*** 發(fā)布/訂閱隊列-消費者* @author haoruijie*/ public class Recv01 {//定義訂閱隊列名稱private static final String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定channel.queueBind(queue,EXCHANGE_NAME,"");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }

    啟動訂閱消費者:所有訂閱消費者都可以獲得消息。

    4.4.4:路由列隊:

    生產者將消息發(fā)送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發(fā)送的消息會指定一個路由key,那么消息只會發(fā)送到相應key相同的隊列,接著監(jiān)聽該隊列的消費者消費消息。

    也就是讓消費者有選擇性的接收消息。

    4.4.4.1:創(chuàng)建路由列隊-生產者:
    /*** 路由隊列-生產者*/ public class Send {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,dorect是路由*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//準備消息String message = "Hello world";/*** 發(fā)送消息* 交換機名* 交換機key*/channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,"green",null,message.getBytes(StandardCharsets.UTF_8));}} }

    啟動路由生產者:

    會將消息發(fā)送到名為EXCHANGE_NAME的交換機中,分別將消息key設置為orange和green

    4.4.4.2:創(chuàng)建路由列隊-消費者:
    /*** 路由隊列-消費者*/ public class Secv01 {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定channel.queueBind(queue,EXCHANGE_NAME,"black");channel.queueBind(queue,EXCHANGE_NAME,"green");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }

    啟動消費者:

    [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-HXBlxAVB-1634903973672)(/Users/haoruijie/Library/Application Support/typora-user-images/image-20211022135436669.png)]

    消費者01會取到交換機名為EXCHANGE_NAME,key值為green和orange 而消費者02只會收到key值為orange的消息


    4.4.5:主題路由列隊:(使用最多的模式,通過模糊匹配,使得操作更加自如)

    通過通配符模式來判斷路由key通俗的來講就是模糊匹配

    *.匹配一個字符 #.匹配所有字符

    4.4.5.1:創(chuàng)建主題路由列隊-生產者:
    /*** 主題-路由隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接try (Connection connection = connectionFactory.newConnection();//創(chuàng)建信道Channel channel = connection.createChannel()){/*** 綁定交換機* 1.交換機名稱* 2.交換機的類型,TOPIC主題路由列隊*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//準備消息String message1 = "Hello world1";String message2 = "Hello world2";String message3 = "Hello world3";//設置交換機key值String routingKey1 = "quick.orange.rabbit";String routingKey2 = "lazy.pink.rabbit";String routingKey3 = "quick.hello.male";//發(fā)送消息channel.basicPublish(EXCHANGE_NAME,routingKey1,null,message1.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,routingKey2,null,message2.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME,routingKey3,null,message3.getBytes(StandardCharsets.UTF_8));}} }

    啟動路由生產者:

    會將消息發(fā)送到名為EXCHANGE_NAME的交換機中,分別將消息key設置為routingKey1、routingKey2和routingKey3

    4.4.5.2:創(chuàng)建主題路由列隊-消費者:
    /*** 主題-路由隊列-消費者* @author haoruijie*/ public class Recv01 {//定義隊列名稱private static final String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊列,排他隊列String queue = channel.queueDeclare().getQueue();//隊列和交換機綁定綁定keychannel.queueBind(queue,EXCHANGE_NAME,"*.orange.*");channel.queueBind(queue,EXCHANGE_NAME,"lazy.#");//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(queue,true,deliverCallback,consumerTag->{});} }

    啟動消費者

    運行結果:

    消費者01只會匹配routingKey1和routingKey2的消息

    4.5、事務:

    使用事務會大幅度降低性能 (一般不會使用) 開啟事務會知道生產者是否將消息成功提交到列隊里

    4.5.1創(chuàng)建事務列隊:

    4.5.1.1:創(chuàng)建事務列隊-生產者:
    /*** 事務隊列-生產者* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "tx";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {connection = connectionFactory.newConnection();channel = connection.createChannel();//開啟事務channel.txSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//制造異常(遇到異常事務回滾)int a = 1/0;//提交事務channel.txCommit();}catch (Exception e){//事務回滾channel.txRollback();e.printStackTrace();}finally {//關閉連接if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }
    4.5.1.2:創(chuàng)建事務列隊-消費者:
    /*** 事務隊列消費者* @author haoruijie*/ public class Recv {//定義隊列名稱private static final String QUEUE_NAME = "tx";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("yeb");connectionFactory.setPassword("yeb");connectionFactory.setVirtualHost("/yeb");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//綁定隊列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//打印消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody(),"UTF-8");System.out.println(message);};/*** 消費消息* 列隊名稱* 消費確認*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});} }

    啟動消費者:


    啟動生產者: 發(fā)現(xiàn)異常事務回調 消費者沒有消息消費

    4.6、確認模式:

    (確認生產者是否把消息發(fā)送到了服務器)

    4.6.1:同步確認:

    (同步確認會影響性能一般不會使用)

    4.6.1.1:創(chuàng)建同步-確認-生產者:
    /*** 確認-同步-生產者 (會影響性能)* @author haoruijie*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "sync";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("web");connectionFactory.setPassword("web");connectionFactory.setVirtualHost("/web");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {connection = connectionFactory.newConnection();channel = connection.createChannel();//啟動確認模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//準備消息String message = "Hello world";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));//普通確認,只能單條確認if(channel.waitForConfirms()){System.out.println("確認成功!");}//普通批量確認 ,如果有一條不成功就會拋異常,全部成功不會拋異常//channel.waitForConfirmsOrDie();}catch (Exception e){e.printStackTrace();}finally {if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }

    啟動生產者:

    發(fā)送消息成功:

    4.6.2:異步確認:

    (異步確認效率是最高的)

    4.6.2.1:創(chuàng)建異步-確認-生產者:
    /*** 確認-異步-生產者 (效率最高)*/ public class Send {//定義隊列名稱private static final String QUEUE_NAME = "async";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//連接工廠配置connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("yeb");connectionFactory.setPassword("yeb");connectionFactory.setVirtualHost("/yeb");//創(chuàng)建連接Connection connection = null;//創(chuàng)建信道Channel channel = null;try {final SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<Long>());connection = connectionFactory.newConnection();channel = connection.createChannel();//啟動確認模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//天際channel監(jiān)聽channel.addConfirmListener(new ConfirmListener() {//已確認@Overridepublic void handleAck(long l, boolean b) throws IOException {//b為true確認多條成功 為false確認單條成功if (b){System.out.println("確認多條成功");set.headSet(l+1L).clear();}else {System.out.println("確認單條成功"+l);set.remove(l);}}//未確認@Overridepublic void handleNack(long l, boolean b) throws IOException {//b為true多條未確認 為false單條未確認if (b){System.out.println("多條未確認");set.headSet(l+1L).clear();}else {System.out.println("單體未確認"+l);set.remove(l);}}});int i =0;while (i<20){i++;//準備消息String message = "Hello world"+i;Long seqNo = channel.getNextPublishSeqNo();//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));set.add(seqNo);System.out.println("[x] Sent'"+message+"'");}}catch (Exception e){e.printStackTrace();}finally {if (channel!=null){channel.close();}if (connection!=null){connection.close();}}} }

    啟動生產者:

    確認發(fā)送消息成功:

    消息發(fā)送成功:

    4.7、使用springBoot 簡單的實現(xiàn)AMQP:

    4.7.1:創(chuàng)建springboot多模塊項目:

    4.7.1.1:創(chuàng)建消費者模塊、并繼承父模塊:
    4.7.1.2:編寫消費者模塊配置文件:
    #配置端口號 server:port: 8002 spring:#Rabbitmq生產出配置rabbitmq:#iphost: 127.0.0.1#用戶名username: guest#密碼password: guest#端口port: 5672
    4.7.1.3:編寫消費者測試類:
    @Component public class Test {@RabbitListener(queues = "hello")public void demo(String hello){System.out.println(hello);} }
    4.7.1.4:編寫queue配置文件:
    @Configuration public class RabbitmqConfig {@Beanpublic Queue queue(){return new Queue("hello");} }
    4.7.1.4:啟動消費者服務:

    啟動成功hello列隊已存在

    4.7.2.1:創(chuàng)建生產者模塊、繼承父模塊:
    4.7.2.2:編寫生產者配置文件:
    #配置端口號 server:port: 8001spring:#Rabbitmq生產出配置rabbitmq:#iphost: 127.0.0.1#用戶名username: web#密碼password: web#端口port: 5672
    4.7.2.3:編寫生產者測試類:
    @Component public class Test {@Autowiredprivate RabbitTemplate template;public void demo(){template.convertAndSend("hello","hello world");} }
    4.7.2.4:發(fā)送消息:

    發(fā)送被消費者立即消費

    總結

    以上是生活随笔為你收集整理的RabbitMQ初步学习(Mac)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。