7.RabbitMQ RFC同步调用
生活随笔
收集整理的這篇文章主要介紹了
7.RabbitMQ RFC同步调用
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
RabbitMQ RFC同步調(diào)用是使用了兩個(gè)異步調(diào)用完成的,生產(chǎn)者調(diào)用消費(fèi)者的同時(shí),自己也作為消費(fèi)者等待某一隊(duì)列的返回消息,消費(fèi)者接受到生產(chǎn)者的消息同時(shí),也作為消息發(fā)送者發(fā)送一消息給生產(chǎn)者。參考下圖:
調(diào)用流程如下:
其他的消息服務(wù)器實(shí)現(xiàn)同步調(diào)用也是類似的原理,比如ActiveMQ。 下面編寫消費(fèi)者類Server
生產(chǎn)者Client代碼
啟動(dòng)一命令行,將當(dāng)前目錄轉(zhuǎn)移到項(xiàng)目所在的目錄
在Eclipse中運(yùn)行Client
代碼: package com.test.rfc; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Server { public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認(rèn)端口5672 Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); String queueName = "queue_rpc"; channel.queueDeclare(queueName, false, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException? { System.out.println("test1" + new String(body)); AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); String response = "hello client,I'm rfc server"; channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, false, consumer); } catch (IOException e) { e.printStackTrace(); } } } package com.test.rfc; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.*; public class Client { public static void main(String[] argv) { try { //發(fā)送消息的隊(duì)列,Server在這個(gè)隊(duì)列上接受消息 String queueName = "queue_rpc"; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.142"); //使用默認(rèn)端口5672 Connection connection = null; connection = factory.newConnection(); Channel channel = connection.createChannel(); //生成臨時(shí)的隊(duì)列,Client在這隊(duì)列上等待Server返回信息,Server向這個(gè)隊(duì)列發(fā)消息 String replyQueueName = channel.queueDeclare().getQueue(); //生成唯一ID final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); //客戶端發(fā)送RFC請求 channel.basicPublish("", queueName, props, "GetUserInfo".getBytes()); //Server返回消息 final BlockingQueue response = new ArrayBlockingQueue(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getCorrelationId()); if (properties.getCorrelationId().equals(corrId)) { response.offer(body); } } }); byte[] b = response.take(); System.out.println(new String(b)); channel.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } }
轉(zhuǎn)載于:https://www.cnblogs.com/zzpblogs/p/8168805.html
總結(jié)
以上是生活随笔為你收集整理的7.RabbitMQ RFC同步调用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 韵达快递寄对鞋子要多少钱
- 下一篇: ES6中export及export de