日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python消息队列中间件_python-RabbtiMQ消息队列

發(fā)布時間:2023/12/1 python 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python消息队列中间件_python-RabbtiMQ消息队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。

RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。

2.RabbitMQ能為你做些什么?

消息系統(tǒng)允許軟件、應(yīng)用相互連接和擴展.這些應(yīng)用可以相互鏈接起來組成一個更大的應(yīng)用,或者將用戶設(shè)備和數(shù)據(jù)進行連接.消息系統(tǒng)通過將消息的發(fā)送和接收分離來實現(xiàn)應(yīng)用程序的異步和解偶.

或許你正在考慮進行數(shù)據(jù)投遞,非阻塞操作或推送通知。或許你想要實現(xiàn)發(fā)布/訂閱,異步處理,或者工作隊列。所有這些都可以通過消息系統(tǒng)實現(xiàn)。

RabbitMQ是一個消息代理- 一個消息系統(tǒng)的媒介。它可以為你的應(yīng)用提供一個通用的消息發(fā)送和接收平臺,并且保證消息在傳輸過程中的安全。

3.RabbitMQ 安裝使用

4.Python應(yīng)用RabbitMQ

python操作RabbitMQ的模塊有三種:pika,Celery,Haigha。

本文使用的是pika。

"""RabbitMQ-生產(chǎn)者。"""

importpika"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識"""channel.queue_declare(queue='hello')"""定義queue中的消息內(nèi)容"""channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')print("[x] Sent 'Hello World!'")

"""RabbitMQ-消費者。"""

importpika"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識,與生產(chǎn)者隊列中對應(yīng)"""channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue='hello', #queue_declare(queue='hello') 對應(yīng)

no_ack=True

)"""消費者會一直監(jiān)聽這queue,如果隊列中沒有消息,則會卡在這里,等待消息隊列中生成消息。"""

print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

5.RabbitMQ消息持久化

importpika

queue_name= 'xiaoxi_'

"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識

queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange='',

routing_key=queue_name,

body=input_value,

properties=pika.BasicProperties( #消息持久化.....

delivery_mode=2,

)

)continue

producer.py

importpika,time

queue_name= 'xiaoxi_'

"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)#time.sleep(5) # 模擬消費者丟失生產(chǎn)者發(fā)送的消息,生產(chǎn)者消息隊列中的這一條消息則不會刪除。

print('rev messages-->',body)"""手動向生產(chǎn)者確認收到消息"""

#ch.basic_ack(delivery_tag=method.delivery_tag)

"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

6.RabbitMQ消息公平分發(fā)

importpika

queue_name= 'xiaoxi_1'

"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識

queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange='',

routing_key=queue_name,

body=input_value,

)continue

producer.py

importpika,time

queue_name= 'xiaoxi_1'

"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識

queue,durable 持久化"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""模擬處理消息快慢速度"""time.sleep(1)

ch.basic_ack(delivery_tag=method.delivery_tag)"""根據(jù)消費者處理消息的快慢公平分發(fā)消息"""channel.basic_qos(prefetch_count=1)"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.RabbitMQ-廣播模式。

消息的發(fā)送模式類型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。

表達式符號說明:#代表一個或多個字符,*代表任何字符

例:#.a會匹配a.a,aa.a,aaa.a等

*.a會匹配a.a,b.a,c.a等

注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout

4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)

7.1 topic 廣播模式。

importpika"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'

"""定義exchage模式 direct廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""消息的發(fā)送模式類型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。

2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。

表達式符號說明:#代表一個或多個字符,*代表任何字符

例:#.a會匹配a.a,aa.a,aaa.a等

*.a會匹配a.a,b.a,c.a等

注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout

4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""

whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange=exchange_name,

routing_key=routing_key,

body=input_value,

)continue

producer.py

importpika,time"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue=queue_name,

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.2?direct 廣播模式

importpika

connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)

channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'direct_messages'routing_key= 'my_direct'

"""定義exchage模式 direct廣播模式

消息的發(fā)送模式類型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。

2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。

表達式符號說明:#代表一個或多個字符,*代表任何字符

例:#.a會匹配a.a,aa.a,aaa.a等

*.a會匹配a.a,b.a,c.a等

注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout

4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""channel.exchange_declare(exchange=exchange_name,exchange_type='direct')

channel.basic_publish(

exchange=exchange_name,

routing_key=routing_key,

body='hello word!',

)#while True:#input_value = input(">>:").strip()#if input_value:#"""定義queue中的消息內(nèi)容"""#print('producer messages:{0}'.format(input_value))#channel.basic_publish(#exchange=exchange_name,#routing_key=routing_key,#body=input_value,#)#continue

producer.py

importpika,time

connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)

channel=connection.channel()

exchange_name= 'direct_messages'routing_key= 'my_direct'channel.exchange_declare(exchange=exchange_name,exchange_type='direct')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue=queue_name,

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.3?fanout 廣播模式

importpika"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()

exchange_name= 'messages'

"""定義exchage模式 fanout廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""消息的發(fā)送模式類型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。

2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。

表達式符號說明:#代表一個或多個字符,*代表任何字符

例:#.a會匹配a.a,aa.a,aaa.a等

*.a會匹配a.a,b.a,c.a等

注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout

4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""

whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange=exchange_name,

routing_key='',

body=input_value,

)continue

producer.py

importpika,time"""聲明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""聲明一個管道"""channel=connection.channel()"""

"""exchange_name= 'messages'channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name)"""每一個消費者隨機一個唯一的queue_name"""

print('queue_name:{0}',format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息

queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

8?RabbitMQ 實現(xiàn) RPC

"""RabbitMQ-生產(chǎn)者。

利用rabbitMQ 實現(xiàn)一個能收能發(fā)的RPC小程序。

重點需要注意的是:queue的綁定。接收的一端必選預(yù)先綁定queue生成隊列,發(fā)送端才能根據(jù)queue發(fā)送。"""

importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):

self.rpc_queue=rpc_queue

self.app_id=str(uuid.uuid4())

self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))

self.channel=self.connection.channel()"""生成一個自動queue,傳過去server,server再往這個自動queue回復(fù)數(shù)據(jù)"""autoqueue= self.channel.queue_declare(exclusive=True)

self.callback_queue=autoqueue.method.queue"""先定義一個接收回復(fù)的動作"""self.channel.basic_consume(self.on_response, no_ack=True,

queue=self.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id ==properties.app_id:

self.response=bodydefsend(self,msg):

self.response=None

self.channel.basic_publish(

exchange='',

routing_key=self.rpc_queue,

properties=pika.BasicProperties(

reply_to=self.callback_queue,

app_id=self.app_id,

),

body=str(msg)

)#發(fā)送完消息,進入接收模式。

while self.response isNone:#print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))

self.connection.process_data_events()#time.sleep(0.5)

returnself.response

rpc_request_queue= 'rpc_request_queue'rb=rabbitmqClient(rpc_request_queue)whileTrue:

msg= input('input >> :').strip()ifmsg:print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id))print('send msg:{}'.format(msg))

reponses=rb.send(msg)print('reponses msg:{}'.format(reponses.decode('utf-8')))continue

client.py

"""RabbitMQ-消費者。"""

importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):

self.rpc_queue=rpc_queue

self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))

self.channel=self.connection.channel()

self.channel.queue_declare(queue=self.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...

ch.basic_publish(exchange='',

routing_key=properties.reply_to,

properties=pika.BasicProperties(

reply_to=properties.reply_to,

app_id=properties.app_id,

),

body='reponses ok! msg is:{}'.format(body.decode('utf-8')))defstart_consuming(self):

self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)print('waiting for meassages, to exit press CTRL+C')

self.channel.start_consuming()

rpc_request_queue= 'rpc_request_queue'rd_server=rabbitmqServer(rpc_request_queue)

rd_server.start_consuming()

server.py

總結(jié)

以上是生活随笔為你收集整理的python消息队列中间件_python-RabbtiMQ消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。