日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ开发详解

發布時間:2024/4/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ开发详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

?

目錄

?

開發步驟

引入client

生產者

消費者

應用場景

簡單隊列

工作隊列

發布/訂閱

路由模式

topic模式

rpc模式

發布確認


開發步驟

引入client

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.2.0</version> </dependency>

生產者

1、引入類

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;

2、創建Connection

ConnectionFactory factory = new ConnectionFactory();// 設置服務地址factory.setHost("127.0.0.1");// 端口factory.setPort(5672);// vhostfactory.setVirtualHost("/vhost_test");// 用戶名factory.setUsername("admin");// 密碼factory.setPassword("123456");Connection connection = factory.newConnection();

3、創建Channel

Channel channel = connection.createChannel()

channel設置:

?

4、聲明交換器、隊列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

exchange聲明:

DeclareOk exchangeDeclare?(String exchange, String type) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable, boolean autoDelete, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,?Object> arguments) throws IOException; DeclareOk exchangeDeclare?(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,?Object> arguments) throws IOException;

?參數說明:

  • exchange:exchange名稱

  • type:exchange類型,BuiltinExchangeType 枚舉類型包括:fanout,direct,topic,header。

  • durable:是否持久化

  • internal:是否內部exchange,生產者不能直接發布到內部exchange。

  • arguments:其他參數,用于構造exchange。

隊列聲明:

DeclareOk queueDeclare() throws IOException; DeclareOk queueDeclare?(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,?Object> arguments) throws IOException;

參數說明:

  • queue:隊列名稱
  • durable:是否持久化
  • exclusive:是否私有,僅當前程序可訪問。
  • autoDelete:當最后一個消費者取消訂閱后,自動刪除。
  • arguments:其他參數,創建queue時使用。

?

5、發送消息

String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

?

void basicPublish?(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException; void basicPublish?(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException; void basicPublish?(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

參數說明:

  • ?exchange:
  • routingKey:
  • props:消息屬性
  • body:消息體的byte[]格式。
  • mandatory:當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置為false時,出現上述情形broker會直接將消息扔掉。
  • immediate:當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。

mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。

在RabbitMQ3.0以后的版本里,去掉了immediate參數的支持,發送帶immediate標記的publish會返回如下錯誤:
“{amqp_error,not_implemented,“immediate=true”,‘basic.publish’}”,immediate標記會影響鏡像隊列性能,增加代碼復雜性,并建議采用“TTL”和“DLX”等方式替代。

?

消費者

1、引入類

同生產者

2、創建Connection

同生產者

3、創建Channel

同生產者

4、聲明交換器,隊列

同生產者

5、構造Consumer

// 創建消費者Consumer consumer = new DefaultConsumer(channel) {// 獲取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "utf-8");System.out.println("接收到消息——" + msg);}};

6、接收消息并處理

// 監聽隊列channel.basicConsume(QUEUE, true, consumer); String basicConsume?(String queue, Consumer callback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, Consumer callback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException String basicConsume?(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,?Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException

參數說明:

  • queue:隊列名稱。
  • autoAck:服務器收到消息后是否自動應答。
  • consumerTag:消費者標簽,用來區分多個消費者
  • noLocal:設置為true,表示 不能將同一個Conenction中生產者發送的消息傳遞給這個Connection中 的消費者
  • exclusive:是否排他
  • arguments:消費者的參數
  • callback:消費者 DefaultConsumer,用于消費消息,需要重寫其中的方法
  • 其他callback
public interface Consumer {//Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel(java.lang.String).void handleCancel?(String consumerTag)// Called when the consumer is cancelled by a call to Channel.basicCancel(java.lang.String). void handleCancelOk?(String consumerTag)//Called when the consumer is registered by a call to any of the Channel.basicConsume(java.lang.String, com.rabbitmq.client.Consumer) methods.void handleConsumeOk?(String consumerTag) //Called when a basic.deliver is received for this consumer.void handleDelivery?(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) //Called when a basic.recover-ok is received in reply to a basic.recover.void handleRecoverOk?(String consumerTag) // Called when either the channel or the underlying connection has been shut down.void handleShutdownSignal?(String consumerTag, ShutdownSignalException sig) }
  • handleCancel:除了調用basicCancel的其他原因導致消息被取消時調用。
  • handleCancelOk:basicCancel調用導致的訂閱取消時被調用。
  • handleConsumeOk:任意basicComsume調用導致消費者被注冊時調用。
  • handleDelivery:消息接收時被調用。
  • handleRecoverOk:basic.recover-ok被接收時調用
  • handleShutdownSignal:當Channel與Conenction關閉的時候會調用。

應用場景

各場景比較

應用場景exchange隊列生產者端消費者端
簡單隊列單個隊列發送到指定隊列自動應答
工作隊列多個隊列發送到指定隊列自動應答/? ? 手動應答,公平分發
發布/訂閱fanout多個隊列發送到指定exchange,不設置routing key消費指定隊列。
路由模式direct多個隊列發送到指定exchange,設置routing key消費指定隊列。指定1個或者多個binging key
topic模式topic多個隊列發送到指定exchange,設置routing key。key中包含點號(.)。消費指定隊列。指定1個或者多個binging key,key中包含點號(.)。
rpc????
發布確認????

簡單隊列

生產者直接發送消息到隊列,消費者直接消費隊列消息。

/*生產者代碼 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //直接發送消息到隊列。exchange參數為"" channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); /* 消費者代碼 */ //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定義消息消費 DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'"); }; //開始消費 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

?

工作隊列

一般在實際應用中,生產者發送消息耗時較少,反應較快,反而是消費者因為要處理業務邏輯,處理時間可能會很慢,這樣隊列中會積壓很多消息,所以需要多個消費者分攤壓力,這個時候可以使用工作隊列。

生產者消費者代碼與簡單隊列一樣,差別為運行多個消費者代碼實例。

消費者默認采用Round-robin方式輪詢分發,每個消費者接收到的消息數基本一樣。如果消費者處理消息速度不一致,會導致一個空閑,一個繁忙。

可以采用公平模式,如果消費者未處理完消息,則隊列不會再發送消息給此消費者,只到上一條消息處理完。

?修改:

  • 添加channel.basicQos(1):保證一次只分發一條消息。
int prefetchCount = 1; //設置每次僅接收1條消息。 channel.basicQos(prefetchCount);
  • channel.basicAck(envelope.getDeliveryTag(), false):手動確認消息。false表示確認接收消息,true表示拒絕接收消息。
try {doWork(message);} finally {System.out.println(" [x] Done");//手動確認消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
  • channel.basicConsume(QUEUE, false, consumer):設置自動應答為false。
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

?

發布/訂閱

生產者沒有把消費發送給隊列,而是發送給exchange,由exchange進行路由到綁定的隊列,消費者僅消費對應隊列,例如事件通知通過郵件和短信進行通知。

exchange的類型為fanout

private static final String EXCHANGE_NAME = "logs";//聲明一個exchange:logs channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//發送消息到exchange:logs channel.basicPublish( "logs", "", null, message.getBytes());

?

private static final String EXCHANGE_NAME = "logs";//聲明exchange:logschannel.exchangeDeclare(EXCHANGE_NAME, "fanout");//聲明一個隨機名稱的臨時隊列String queueName = channel.queueDeclare().getQueue();//綁定隊列與exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//開始消費臨時隊列:channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

?

路由模式

exchange類型為direct

private static final String EXCHANGE_NAME = "direct_logs";//指定exchange為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv); String message = getMessage(argv); //指定routing key:severity channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); //聲明exchange:類型為:directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();//綁定隊列與exchange,routing key:severityfor (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};//消費消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}

?

topic模式

exchange類型為topic

topic模式與direct模式代碼一樣,區別為binding key與routing key 必須包含點號(.)。

?

channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = getRouting(argv); String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

?

channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

?

rpc模式

發布確認

?

?

總結

以上是生活随笔為你收集整理的RabbitMQ开发详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。