rabbitmq-java生产者消费者
生活随笔
收集整理的這篇文章主要介紹了
rabbitmq-java生产者消费者
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
【README】
本文使用java 連接rabbitmq,模擬生產(chǎn)者,消費(fèi)者場(chǎng)景
?
【1】項(xiàng)目搭建
1)maven項(xiàng)目,依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hello</groupId><artifactId>rabbitmqtest</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.11.0</version></dependency></dependencies> </project>2)生產(chǎn)者
package com.hello.rabbitmqtest.simple;import com.hello.trong.rabbitmqtest.util.MyDateUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {/*** 隊(duì)列名稱 */static final String QUEUE_NAME = "simple_queue2";public static void main(String[] args) throws Exception {// 創(chuàng)建連接工廠 ConnectionFactory connFactory = new ConnectionFactory();// 主機(jī)地址,默認(rèn)為 localhostconnFactory.setHost("192.168.163.201");;connFactory.setPort(5672);// 設(shè)置虛擬主機(jī) // connFactory.setVirtualHost("/hello");// 設(shè)置賬號(hào)密碼 connFactory.setUsername("guest");connFactory.setPassword("guest");// 創(chuàng)建連接Connection conn = connFactory.newConnection();// 創(chuàng)建頻道Channel channel = conn.createChannel();/*** 創(chuàng)建隊(duì)列* 隊(duì)列名稱, 是否持久化隊(duì)列,是否獨(dú)占本次連接,是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列, 隊(duì)列其他參數(shù); */channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 發(fā)送消息 String message = "【simple_queue2】 hello rabbitmq now is " + MyDateUtil.getNow();channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("生產(chǎn)者發(fā)送消息" + message);// 關(guān)閉通道和連接channel.close();conn.close(); } } // 日期工具類 public class MyDateUtil {public static void main(String[] args) {}public static String getNow() {return getNow("yyyy-MM-dd hh:mm:ss.SSS");}public static String getNow(String format) {SimpleDateFormat formater = new SimpleDateFormat(format);String dateFormated = formater.format(new Date());return dateFormated; } }3)消費(fèi)者
package com.hello.rabbitmqtest.simple;import java.io.IOException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties;/*** rabbitmq 消費(fèi)者*/ public class MyConsumer {/*** 隊(duì)列名稱 */static final String QUEUE_NAME = "simple_queue2";public static void main(String[] args) throws Exception {// 創(chuàng)建連接工廠 ConnectionFactory connFactory = new ConnectionFactory();// 主機(jī)地址,默認(rèn)為 localhostconnFactory.setHost("192.168.163.201");;connFactory.setPort(5672);// 設(shè)置虛擬主機(jī)connFactory.setVirtualHost("/");// 設(shè)置賬號(hào)密碼 connFactory.setUsername("guest");connFactory.setPassword("guest");// 創(chuàng)建連接Connection conn = connFactory.newConnection();// 創(chuàng)建頻道Channel channel = conn.createChannel();// 聲明要關(guān)注的隊(duì)列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("customer wait to receive message");// 告訴服務(wù)器,我們需要哪個(gè)頻道的角色,如果頻道中有消息,就會(huì)執(zhí)行回調(diào)函數(shù) handleDeliveryConsumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費(fèi)者標(biāo)簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內(nèi)容,包括消息id,消息routingkey,交換機(jī),消息和重轉(zhuǎn)標(biāo)記(收到消息失敗后是否需要重新發(fā)送) * @param properties 基本屬性* @param body 消息字節(jié)數(shù)組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機(jī)=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費(fèi)者收到的消息【%s】", message)); System.out.println("============================================="); } };// 自動(dòng)恢復(fù)隊(duì)列應(yīng)答 -- rabbitmq中的消息確認(rèn)機(jī)制 channel.basicConsume(QUEUE_NAME, true, consumer); } }【2】 測(cè)試效果
// 生產(chǎn)者 生產(chǎn)者發(fā)送消息【simple_queue2】 hello rabbitmq now is 2021-02-28 12:01:23.926// 消費(fèi)者 路由key=simple_queue2 交換機(jī)= 消息id=3 消費(fèi)者收到的消息【【simple_queue2】 hello rabbitmq now is 2021-02-28 12:01:23.926】 ============================================= 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的rabbitmq-java生产者消费者的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么区分模板网站(怎么区分模板网站类型)
- 下一篇: rabbitmq-发布订阅模式