日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python之rabbitMQ

發(fā)布時間:2025/3/17 python 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python之rabbitMQ 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

? ? ? ?

一、簡單的rabbitMQ隊列通信

由上圖可知,數(shù)據(jù)是先發(fā)給exchange交換器,exchage再發(fā)給相應(yīng)隊列。pika模塊是python對rabbitMQ的API接口。接收端有一個回調(diào)函數(shù)一接收到數(shù)據(jù)就調(diào)用該函數(shù)。一條消息被一個消費(fèi)者接收后,該消息就從隊列刪除。OK,了解上面的知識后,先來看看一個簡單的rabbitMQ列隊通信。

send端:

?1?import?pika?2?#連上rabbitMQ?3?connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))?4?channel=connection.channel()???????#生成管道,在管道里跑不同的隊列?5??6?#聲明queue?7?channel.queue_declare(queue='hello1')?8??9?#n?RabbitMQ?a?message?can?never?be?sent?directly?to?the?queue,it?always?needs?to?go?through?an?exchange.10?#向隊列里發(fā)數(shù)據(jù)11?channel.basic_publish(exchange='',      #先把數(shù)據(jù)發(fā)給exchange交換器,exchage再發(fā)給相應(yīng)隊列12???????????????????????routing_key='hello1', #向"hello'隊列發(fā)數(shù)據(jù)13???????????????????????body='HelloWorld!!')  #發(fā)的消息14?print("[x]Sent'HelloWorld!'")15?connection.close()

receive端:

?1?import?pika?2??3?connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))?4?channel=connection.channel()?5??6?#?You?may?ask?why?we?declare?the?queue?again??we?have?already?declared?it?in?our?previous?code.?7?#?We?could?avoid?that?if?we?were?sure?that?the?queue?already?exists.?For?example?if?send.py?program?8?#?was?run?before.?But?we're?not?yet?sure?which?program?to?run?first.?In?such?cases?it's?a?good?9?#?practice?to?repeat?declaring?the?queue?in?both?programs.10?channel.queue_declare(queue='hello1')#聲明隊列,保證程序不出錯11?12?13?def?callback(ch,method,properties,body):14?????print("-->ch",ch)15?????print("-->method",method)16?????print("-->properties",properties)17?????print("[x]?Received?%r"?%?body)?????????#一條消息被一個消費(fèi)者接收后,該消息就從隊列刪除18?19?20?channel.basic_consume(callback,??????????????#回調(diào)函數(shù),一接收到消息就調(diào)用回調(diào)函數(shù)21???????????????????????queue='hello1',22???????????????????????no_ack=False)????#消費(fèi)完畢后向服務(wù)端發(fā)送一個確認(rèn),默認(rèn)為False23?24?print('[*]?Waiting?for?messages.To?exit?press?CTRL+C')25?channel.start_consuming()

運(yùn)行結(jié)果:(上面的代碼對應(yīng)我寫的注釋相信是看得懂的~)

?View Code

經(jīng)過深入的測試,有以下兩個發(fā)現(xiàn):

  • 先運(yùn)行rabbitMQ_1_send.py發(fā)送數(shù)據(jù),rabbitMQ_2_receive.py未運(yùn)行。發(fā)現(xiàn)當(dāng)receive運(yùn)行時仍能接收數(shù)據(jù)。

  • 運(yùn)行多個(eg:3個)接收數(shù)據(jù)的客戶端,再運(yùn)行發(fā)送端,客戶端1收到數(shù)據(jù),再運(yùn)行發(fā)送端,客戶端2收到數(shù)據(jù),再運(yùn)行發(fā)送端,客戶端3收到數(shù)據(jù)。

  • RabbitMQ會默認(rèn)把p發(fā)的消息依次分發(fā)給各個消費(fèi)者(c),跟負(fù)載均衡差不多。

    ?

    二、全英文ack

    在看上面的例子,你會發(fā)現(xiàn)有一句代碼no_ack=False(消費(fèi)完畢后向服務(wù)端發(fā)送一個確認(rèn),默認(rèn)為False),以我英語四級飄過的水平,看完下面關(guān)于ack的講解感覺寫得很牛啊!!于是分享一下:

    Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

    But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

    In order to make sure a message is never lost, RabbitMQ supports message?acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

    If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

    There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

    Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the?no_ack=True?flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

    Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

    我把發(fā)送端和接收端分別比作生產(chǎn)者與消費(fèi)者。生產(chǎn)者發(fā)送任務(wù)A,消費(fèi)者接收任務(wù)A并處理,處理完后生產(chǎn)者將消息隊列中的任務(wù)A刪除。現(xiàn)在我們遇到了一個問題:如果消費(fèi)者接收任務(wù)A,但在處理的過程中突然宕機(jī)了。而此時生產(chǎn)者將消息隊列中的任務(wù)A刪除。實(shí)際上任務(wù)A并未成功處理完,相當(dāng)于丟失了任務(wù)/消息。為解決這個問題,應(yīng)使消費(fèi)者接收任務(wù)并成功處理完后發(fā)送一個ack到生產(chǎn)者!生產(chǎn)者收到ack后就明白任務(wù)A已被成功處理,這時才從消息隊列中將任務(wù)A刪除,如果沒有收到ack,就需要把任務(wù)A發(fā)送給下一個消費(fèi)者,直到任務(wù)A被成功處理。

    ?

    三、消息持久化

    前面已經(jīng)知道,生產(chǎn)者生產(chǎn)數(shù)據(jù),消費(fèi)者再啟動是可以接收數(shù)據(jù)的。

    但是,生產(chǎn)者生產(chǎn)數(shù)據(jù),然后重啟rabbitMQ,消費(fèi)者是無法接收數(shù)據(jù)

    eg:消息在傳輸過程中rabbitMQ服務(wù)器宕機(jī)了,會發(fā)現(xiàn)之前的消息隊列就不存在了,這時我們就要用到消息持久化,消息持久化會讓隊列不隨著服務(wù)器宕機(jī)而消失,會永久的保存下去。下面看下關(guān)于消息持久化的英文講解:

    We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable ?use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

    When?RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

    First,?we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as?durable:

          1?channel.queue_declare(queue='hello', durable=True)

    Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called?hello?which is not durable.RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error(會曝錯) to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

          1 channel.queue_declare(queue='task_queue', durable=True)

    This?queue_declare?change needs to be applied to both the producer and consumer code.

    At that point we're sure that the?task_queue?queue won't be lost even if RabbitMQ restarts. Now we need to?mark our messages as persistent - by supplying a?delivery_mode?property with a value?2.

          1 channel.basic_publish(exchange='',
          2 ??????????????????????routing_key="task_queue",
          3 ??????????????????????body=message,
          4 ??????????????????????properties=pika.BasicProperties(
          5 ?????????????????????????delivery_mode?=?2,? ? ? # make message persistent
          6 ??????????????????????))

    上面的英文對消息持久化講得很好。消息持久化分為兩步:

    • 持久化隊列。通過代碼實(shí)現(xiàn)持久化hello隊列:channel.queue_declare(queue='hello',?durable=True)

    • 持久化隊列中的消息。通過代碼實(shí)現(xiàn):properties=pika.BasicProperties(?delivery_mode?=?2, )

    這里有個點(diǎn)要注意下:

    如果你在代碼中已實(shí)現(xiàn)持久化hello隊列與隊列中的消息。那么你重啟rabbitMQ后再次運(yùn)行代碼可能會爆錯!

    因?yàn)? RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error.

    為了解決這個問題,可以聲明一個與重啟rabbitMQ之前不同的隊列名(queue_name).

    ?

    四、消息公平分發(fā)

    如果Rabbit只管按順序把消息發(fā)到各個消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時配置高的消費(fèi)者卻一直很輕松。為解決此問題,可以在各個消費(fèi)者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費(fèi)者當(dāng)前消息還沒處理完的時候就不要再給我發(fā)新消息了

    ?

    帶消息持久化+公平分發(fā)的完整代碼

    生產(chǎn)者端:

    ?View Code

    消費(fèi)者端:

    ?View Code

    我在運(yùn)行上面程序時對消費(fèi)者端里回調(diào)函數(shù)的一句代碼(ch.basic_ack(delivery_tag =method.delivery_tag))十分困惑。這句代碼去掉消費(fèi)者端也能照樣收到消息啊。這句代碼有毛線用處??

    生產(chǎn)者端消息持久后,需要在消費(fèi)者端加上(ch.basic_ack(delivery_tag =method.delivery_tag)):?保證消息被消費(fèi)后,消費(fèi)端發(fā)送一個ack,然后服務(wù)端從隊列刪除該消息.

    ?

    五、消息發(fā)布與訂閱

    之前的例子都基本都是1對1的消息發(fā)送和接收,即消息只能發(fā)送到指定的queue里,但有些時候你想讓你的消息被所有的queue收到,類似廣播的效果,這時候就要用到exchange了。PS:有興趣的了解redis的發(fā)布與訂閱,可以看看我寫的博客python之redis。

    An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded(丟棄). The rules for that are defined by the?exchange type.

    Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息

    ?

    fanout:?所有bind到此exchange的queue都可以接收消息

    direct:?通過routingKey和exchange決定的那個唯一的queue可以接收消息

    topic:所有符合routingKey(此時可以是一個表達(dá)式)的routingKey所bind的queue可以接收消息

    ?

    表達(dá)式符號說明: #代表一個或多個字符,*代表任何字符
    ? ? ?     例:#.a會匹配a.a,aa.a,aaa.a等
    ? ? ? ?      ?? *.a會匹配a.a,b.a,c.a等
    ? ?     ?? ?注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout

    ?

    下面我分別講下fanout,direct,topic:

    1、fanout

    fanout:?所有bind到此exchange的queue都可以接收消息

    send端:

    ?View Code

    receive端:

    ?View Code

    有兩個點(diǎn)要注意下:

    • fanout-廣播,send端的routing_key='', #fanout的話為空(默認(rèn))

    • receive端有一句代碼:result=channel.queue_declare(exclusive=True),作用:不指定queue名字(為了收廣播),rabbitMQ會隨機(jī)分配一個queue名字,exclusive=True會在使用此queue的消費(fèi)者斷開后,自動將queue刪除。

    ?

    2、有選擇的接收消息(exchange type=direct)

    RabbitMQ還支持根據(jù)關(guān)鍵字發(fā)送,即:隊列綁定關(guān)鍵字,發(fā)送者將數(shù)據(jù)根據(jù)關(guān)鍵字發(fā)送到消息exchange,exchange根據(jù) 關(guān)鍵字 判定應(yīng)該將數(shù)據(jù)發(fā)送至指定隊列。

    send端:

    ?View Code

    receive端:

    ?View Code

    其實(shí)最開始我看代碼是一臉懵逼的~ 下面是我在cmd進(jìn)行測試的截圖(配合著截圖看會容易理解些),一個send端,兩個receive端(先起receive端,再起receive端):

    send端:

    receive端-1:

    receive端-2:

    ?

    3、更細(xì)致的消息過濾topic(供參考)

    Although using the?direct?exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

    In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the?syslog?unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

    That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

    感覺我英文水平不高啊~,我對照著垃圾有道翻譯,加上自己的理解,大概知道上面在講什么。

    舉例: 如果是系統(tǒng)的錯誤,就把信息發(fā)送到A,如果是MySQL的錯誤,就把信息發(fā)送到B。但是對B來說,想實(shí)現(xiàn)接收MySQL的錯誤信息,可以用有選擇的接收消息(exchange type=direct),讓關(guān)鍵字為error就實(shí)現(xiàn)了啊!現(xiàn)在B有個需求:不是所有的錯誤信息都接收,只接收指定的錯誤。在某種信息再進(jìn)行過濾,這就是更細(xì)致的消息過濾topic。

    ?

    send端:

    ?View Code

    receive端:

    ?View Code

    ?

    ?

    六、RPC(Remote Procedure Call)

    RPC的概念可看我百度的(其實(shí)就類似我之前做的FTP,我從客戶端發(fā)一個指令,服務(wù)端返回相關(guān)信息):

    ?View Code

    下面重點(diǎn)講下RPC通信,我剛開始學(xué)挺難的,學(xué)完之后感覺RPC通信的思想很有啟發(fā)性,代碼的例子寫得也很牛!!

    client端發(fā)的消息被server端接收后,server端會調(diào)用callback函數(shù),執(zhí)行任務(wù)后,還需要把相應(yīng)的信息發(fā)送到client,但是server如何將信息發(fā)還給client?如果有多個client連接server,server又怎么知道是要發(fā)給哪個client??

    RPC-server默認(rèn)監(jiān)聽rpc_queue.肯定不能把要發(fā)給client端的信息發(fā)到rpc_queue吧(rpc_queue是監(jiān)聽client端發(fā)到server端的數(shù)據(jù))。

    合理的方案是server端另起一個queue,通過queue將信息返回給對應(yīng)client。但問題又來了,queue是server端起的,故client端肯定不知道queue_name,連queue_name都不知道,client端接收毛線的數(shù)據(jù)??

    解決方法:

    客戶端在發(fā)送指令的同時告訴服務(wù)端:任務(wù)執(zhí)行完后,數(shù)據(jù)通過某隊列返回結(jié)果。客戶端監(jiān)聽該隊列就OK了。

    client端:

    ?1?import?pika?2?import?uuid?3??4??5?class?FibonacciRpcClient(object):?6?????def?__init__(self):?7?????????self.connection?=?pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))?8??9?????????self.channel?=?self.connection.channel()10?????????#隨機(jī)建立一個queue,為了監(jiān)聽返回的結(jié)果11?????????result?=?self.channel.queue_declare(exclusive=True)12?????????self.callback_queue?=?result.method.queue???##隊列名13?14?????????self.channel.basic_consume(self.on_response,??#一接收客戶端發(fā)來的指令就調(diào)用回調(diào)函數(shù)on_response15????????????????????????????????????no_ack=True,16????????????????????????????????????queue=self.callback_queue)17?18?????def?on_response(self,?ch,?method,?props,?body):??#回調(diào)19?????????#每條指令執(zhí)行的速度可能不一樣,指令1比指令2先發(fā)送,但可能指令2的執(zhí)行結(jié)果比指令1先返回到客戶端,20?????????#此時如果沒有下面的判斷,客戶端就會把指令2的結(jié)果誤認(rèn)為指令1執(zhí)行的結(jié)果21?????????if?self.corr_id?==?props.correlation_id:22?????????????self.response?=?body23?24?????def?call(self,?n):25?????????self.response?=?None????##指令執(zhí)行后返回的消息26?????????self.corr_id?=?str(uuid.uuid4())???##可用來標(biāo)識指令(順序)27?????????self.channel.basic_publish(exchange='',28????????????????????????????????????routing_key='rpc_queue',?#client發(fā)送指令,發(fā)到rpc_queue29????????????????????????????????????properties=pika.BasicProperties(30????????????????????????????????????????reply_to=self.callback_queue,?#將指令執(zhí)行結(jié)果返回到reply_to隊列31????????????????????????????????????????correlation_id=self.corr_id,32????????????????????????????????????),33????????????????????????????????????body=str(n))34?????????while?self.response?is?None:35?????????????self.connection.process_data_events()?#去queue接收數(shù)據(jù)(不阻塞)36?????????return?int(self.response)37?38?39?fibonacci_rpc?=?FibonacciRpcClient()40?41?print("?[x]?Requesting?fib(30)")42?response?=?fibonacci_rpc.call(30)43?print("?[.]?Got?%r"?%?response)

    server端:

    ?1?import?pika?2?import?time?3??4?connection?=?pika.BlockingConnection(pika.ConnectionParameters(?5?????host='localhost'))?6??7?channel?=?connection.channel()?8??9?channel.queue_declare(queue='rpc_queue')10?11?12?def?fib(n):13?????if?n?==?0:14?????????return?015?????elif?n?==?1:16?????????return?117?????else:18?????????return?fib(n?-?1)?+?fib(n?-?2)19?20?21?def?on_request(ch,?method,?props,?body):22?????n?=?int(body)23?24?????print("?[.]?fib(%s)"?%?n)25?????response?=?fib(n)??#從客戶端收到的消息26?27?????ch.basic_publish(exchange='',???##服務(wù)端發(fā)送返回的數(shù)據(jù)到props.reply_to隊列(客戶端發(fā)送指令時聲明)28??????????????????????routing_key=props.reply_to,??#correlation_id?(隨機(jī)數(shù))每條指令都有隨機(jī)獨(dú)立的標(biāo)識符29??????????????????????properties=pika.BasicProperties(correlation_id=?\30??????????????????????????????????????????????????????????props.correlation_id),31??????????????????????body=str(response))32?????ch.basic_ack(delivery_tag=method.delivery_tag)??#客戶端持久化33?34?35?channel.basic_qos(prefetch_count=1)??#公平分發(fā)36?channel.basic_consume(on_request,????#一接收到消息就調(diào)用on_request37???????????????????????queue='rpc_queue')38?39?print("?[x]?Awaiting?RPC?requests")40?channel.start_consuming()

    本文轉(zhuǎn)自帥氣的頭頭博客51CTO博客,原文鏈接http://blog.51cto.com/12902932/1924608如需轉(zhuǎn)載請自行聯(lián)系原作者


    sshpp

    總結(jié)

    以上是生活随笔為你收集整理的python之rabbitMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。