日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

rabbitmq direct reply-to 在springAMQP和python之间的使用

發(fā)布時間:2023/12/3 python 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq direct reply-to 在springAMQP和python之间的使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

背景

公司的一個項目使用rabbitmq作為broker進行交互,并且數(shù)據(jù)的查詢方法使用RPC模式,RPC Client端使用java編寫并使用springAMQP包與rabbitmq交互,在RPC Server端使用python的 pika包與rabbitmq交互。兩端都使用標準官方例程,發(fā)現(xiàn)在Client端發(fā)送的消息可以被Server端接收并處理然后返回結(jié)果,但是Client端只會會收到一個null值。

問題排查

1 理解傳統(tǒng)的RPC模式運行流程

傳統(tǒng)模式下 Client端向一個指定的隊列里推送消息,并聲明一個一次性排他隊列,然后將發(fā)送消息頭部的reply-to屬性的值設(shè)置為隊列的名字,correlation_id屬性設(shè)置為一個隨機生成的值用于消息鑒定然后發(fā)送消息。在發(fā)送后Client端監(jiān)聽聲明的排他隊列,當收到消息后比對correaltiion_id,正確則處理消息斷開監(jiān)聽連接,然后此隊列被系統(tǒng)自動回收。 在Server端收到消息后處理消息然后將消息返回,返回的消息的routing-key設(shè)置為reply-to的值,properties中設(shè)置correlation_id為收到的correlation_id值。這樣就完成一次RPC交互模式。
要解決今天這個問題我們還要知道幾個知識點:

  • 1當消息發(fā)送到exchange后如果沒有隊列接收此消息,那么此消息就會丟失。
  • 2 一次性的排他隊列在Client不在監(jiān)聽此隊列就會自動被rabbitmq刪除。

排查1 Client端收到的Null值從哪里來?

因為我是使用python寫RPC Server端并且我也不怎么會java代碼。……
所以這個null值從那里來我就無法從Client端下手。那我們只能從Server端進行排查。(最后我認為是在java代碼編寫錯誤(是自己的代碼)的情況下 springAMQP返回的一個默認值)

排查2 Server端收到消息后是否正確的將消息返回

在Server端打印收到的message并打印此消息的header信息和body信息,看到在reply-to中就是Client端設(shè)置的隊列。并且通過rabbitmq也看到了這條消息的返回。

排查3 觀察消息有沒有被推送回reply-to隊列

然后我在Server端收到消息后的callback函數(shù)的頭部大了斷點,接收到消息后Server端程序掛起。此時我去查看reply-to中的隊列,發(fā)現(xiàn)其已經(jīng)不存在于rabbitmq中了。 由上面的傳統(tǒng)RPC模式我推斷出 可能是Client端發(fā)送代碼后沒有監(jiān)聽reply-to隊列造成隊列消失,然后Server端發(fā)送的消息因為沒有接收隊列而被丟棄。此時我們基本已經(jīng)將問題鎖定在Client端了。但是Client端的代碼是按照rabbitmq官方給的例程書寫,應該是沒有問題的。此時似乎陷入了僵局。

定位問題:Google大發(fā)加官方文檔

這時候我Google一下SpringAMQP框架的是如何寫RPC代碼?在一些帖子中我發(fā)現(xiàn)有的代碼會添加一個Listener的類,但有的又不添加。我們假設(shè)他們都是可以運行的。那么是什么原因會造成這種情況呢?我第一個就是想到了版本問題。隨著版本的改變可能代碼也會發(fā)生變化。之后我就在SpringAMQP的官方文檔里面進行查找。果然被我找到了,官方文檔里面有這樣一段描述:

Starting with?version 3.4.0, the RabbitMQ server now supports?Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with?Spring AMQP version 1.4.1?Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no?replyQueue ?is provided (or it is set with the name?amq.rabbitmq.reply-to), the?RabbitTemplate?will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue. When using Direct reply-to, a?reply-listener is not required and should not be configured.

springAMQP官方地址
翻譯一下大體意思就是在RabbitMQ3.4.0版本以后官方提供一種叫做Direct reply-to的方式來實現(xiàn)RPC(這種方式可以極大的提高RPC的性能,因為他不需要每次請求Client端都要新申請一個隊列,之后我會再寫一篇來詳細介紹(翻譯 o(∩_∩)o 哈哈 )這個特性。并且在SpringAMQP version 1.4.1版本之后默認使用特性,看了一下服務器上的rabbitmq版本3.3.0 這個真的老果然不支持,SpringAMQP的版本果然也是高于這個版本,問題找到。開心 , 但是怎么解決呢?
Direct reply-to 官方介紹

解決方案

一: 提升rabbitmq版本,并使兩端代碼適配direct reply-to 方式

  • 難點1 python的官網(wǎng)沒有給例程 ,不過給了介紹也告訴了如何來實現(xiàn)
  • 難點2 服務器提升版本,已經(jīng)有業(yè)務跑在上面了,我這種對rabbitmq的萌新對rabbitmq各版本升級后的改變并不是很了解,估計是難說動領(lǐng)導換了。

針對難點2 我就不想了 不過難點1的我已經(jīng)寫出來python如何適配direct reply-to的代碼。
更改都是在Client端,Server端還是可以保持不變。主要主機這幾個方面

  • 1 reply-to的名字更改為‘a(chǎn)mq.rabbitmq.reply-to’這條虛擬隊列,你在rabbitmq的控制臺上是看不到這條隊列的。
  • 2 然后Client監(jiān)聽這條隊列的時候要設(shè)為為no-ack模式。

下面是根據(jù)官方python RPC代碼更改的 適配 Direct reply-to的python代碼
Client端 python代碼

# -*- coding:utf-8 -*- #!/usr/bin/env python import pika import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()# result = self.channel.queue_declare(exclusive=True)# self.callback_queue = result.method.queue# self.channel.basic_consume(self.on_response, no_ack=True,# queue=self.callback_queue)# 監(jiān)聽隊列為 amp.rabbitmq.reply-to 啟動no_ack 模式self.channel.basic_consume(self.on_response,queue='amq.rabbitmq.reply-to',no_ack=True)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(# reply_to = self.callback_queue,# 更改了隊列名字reply_to='amq.rabbitmq.reply-to',correlation_id=self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)

Server端代碼 沒有改動

#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))# ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests") channel.start_consuming()

解決辦法2 java代碼不使用默認的direct reply-to模式

這個辦法因為我不是寫java的所以我只能寫一些我在官方文檔里面理解的東西了。就是當你不使用SpringAMQP的默認RPC模式的化需要增加Listener對象來監(jiān)聽自己的隊列。

RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory); rabbitTempete.setExchange(exchangeName); rabbitTempete.setRoutingKey(topic); //比官方文檔多的Queue replyqQueue=replyQueue(); admin.declareQueue(replyqQueue); rabbitTempete.setReplyQueue(replyqQueue); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueues(replyqQueue); container.setMessageListener(rabbitTempete); container.start(); //比官方文檔多的停止Object response=rabbitTempete.convertSendAndReceive(t);

SpringAMQP書寫官方文檔
相比較要自己申請隊列自己監(jiān)聽。不過我也沒試過這段代碼就不知道能不能用了。

總結(jié)

這個問題基本得到很好的解決了。解決一個問題首先你要明白一個東西正常情況下是一種什么狀況,然后出了問題就從前往后,從后往前,從中往兩邊等等等。然后Google,或者官方文檔,官方論壇。我個人認為官方文檔真的是好東西。無數(shù)的淺坑的解決辦法都在官方文檔。當然深坑就不說了那就是論壇加能力加運氣才能排查出來的了。不過官方大多都是英文。真是愁人,我 加強英語能力吧。

總結(jié)

以上是生活随笔為你收集整理的rabbitmq direct reply-to 在springAMQP和python之间的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。