python消息队列中间件_python-RabbtiMQ消息队列
1.RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
2.RabbitMQ能為你做些什么?
消息系統(tǒng)允許軟件、應(yīng)用相互連接和擴展.這些應(yīng)用可以相互鏈接起來組成一個更大的應(yīng)用,或者將用戶設(shè)備和數(shù)據(jù)進行連接.消息系統(tǒng)通過將消息的發(fā)送和接收分離來實現(xiàn)應(yīng)用程序的異步和解偶.
或許你正在考慮進行數(shù)據(jù)投遞,非阻塞操作或推送通知。或許你想要實現(xiàn)發(fā)布/訂閱,異步處理,或者工作隊列。所有這些都可以通過消息系統(tǒng)實現(xiàn)。
RabbitMQ是一個消息代理- 一個消息系統(tǒng)的媒介。它可以為你的應(yīng)用提供一個通用的消息發(fā)送和接收平臺,并且保證消息在傳輸過程中的安全。
3.RabbitMQ 安裝使用
4.Python應(yīng)用RabbitMQ
python操作RabbitMQ的模塊有三種:pika,Celery,Haigha。
本文使用的是pika。
"""RabbitMQ-生產(chǎn)者。"""
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識"""channel.queue_declare(queue='hello')"""定義queue中的消息內(nèi)容"""channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')print("[x] Sent 'Hello World!'")
"""RabbitMQ-消費者。"""
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識,與生產(chǎn)者隊列中對應(yīng)"""channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue='hello', #queue_declare(queue='hello') 對應(yīng)
no_ack=True
)"""消費者會一直監(jiān)聽這queue,如果隊列中沒有消息,則會卡在這里,等待消息隊列中生成消息。"""
print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
5.RabbitMQ消息持久化
importpika
queue_name= 'xiaoxi_'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
properties=pika.BasicProperties( #消息持久化.....
delivery_mode=2,
)
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)#time.sleep(5) # 模擬消費者丟失生產(chǎn)者發(fā)送的消息,生產(chǎn)者消息隊列中的這一條消息則不會刪除。
print('rev messages-->',body)"""手動向生產(chǎn)者確認收到消息"""
#ch.basic_ack(delivery_tag=method.delivery_tag)
"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
6.RabbitMQ消息公平分發(fā)
importpika
queue_name= 'xiaoxi_1'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_1'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標(biāo)識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""模擬處理消息快慢速度"""time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)"""根據(jù)消費者處理消息的快慢公平分發(fā)消息"""channel.basic_qos(prefetch_count=1)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.RabbitMQ-廣播模式。
消息的發(fā)送模式類型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout
4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)
7.1 topic 廣播模式。
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'
"""定義exchage模式 direct廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""消息的發(fā)送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout
4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=input_value,
)continue
producer.py
importpika,time"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.2?direct 廣播模式
importpika
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'direct_messages'routing_key= 'my_direct'
"""定義exchage模式 direct廣播模式
消息的發(fā)送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout
4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""channel.exchange_declare(exchange=exchange_name,exchange_type='direct')
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body='hello word!',
)#while True:#input_value = input(">>:").strip()#if input_value:#"""定義queue中的消息內(nèi)容"""#print('producer messages:{0}'.format(input_value))#channel.basic_publish(#exchange=exchange_name,#routing_key=routing_key,#body=input_value,#)#continue
producer.py
importpika,time
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()
exchange_name= 'direct_messages'routing_key= 'my_direct'channel.exchange_declare(exchange=exchange_name,exchange_type='direct')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.3?fanout 廣播模式
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()
exchange_name= 'messages'
"""定義exchage模式 fanout廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""消息的發(fā)送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當(dāng)于使用fanout
4.headers: 通過headers 來決定把消息發(fā)給哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內(nèi)容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=input_value,
)continue
producer.py
importpika,time"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""
"""exchange_name= 'messages'channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name)"""每一個消費者隨機一個唯一的queue_name"""
print('queue_name:{0}',format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調(diào)這個函數(shù)處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產(chǎn)者確認已經(jīng)接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
8?RabbitMQ 實現(xiàn) RPC
"""RabbitMQ-生產(chǎn)者。
利用rabbitMQ 實現(xiàn)一個能收能發(fā)的RPC小程序。
重點需要注意的是:queue的綁定。接收的一端必選預(yù)先綁定queue生成隊列,發(fā)送端才能根據(jù)queue發(fā)送。"""
importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.app_id=str(uuid.uuid4())
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()"""生成一個自動queue,傳過去server,server再往這個自動queue回復(fù)數(shù)據(jù)"""autoqueue= self.channel.queue_declare(exclusive=True)
self.callback_queue=autoqueue.method.queue"""先定義一個接收回復(fù)的動作"""self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id ==properties.app_id:
self.response=bodydefsend(self,msg):
self.response=None
self.channel.basic_publish(
exchange='',
routing_key=self.rpc_queue,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
app_id=self.app_id,
),
body=str(msg)
)#發(fā)送完消息,進入接收模式。
while self.response isNone:#print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))
self.connection.process_data_events()#time.sleep(0.5)
returnself.response
rpc_request_queue= 'rpc_request_queue'rb=rabbitmqClient(rpc_request_queue)whileTrue:
msg= input('input >> :').strip()ifmsg:print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id))print('send msg:{}'.format(msg))
reponses=rb.send(msg)print('reponses msg:{}'.format(reponses.decode('utf-8')))continue
client.py
"""RabbitMQ-消費者。"""
importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()
self.channel.queue_declare(queue=self.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...
ch.basic_publish(exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(
reply_to=properties.reply_to,
app_id=properties.app_id,
),
body='reponses ok! msg is:{}'.format(body.decode('utf-8')))defstart_consuming(self):
self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)print('waiting for meassages, to exit press CTRL+C')
self.channel.start_consuming()
rpc_request_queue= 'rpc_request_queue'rd_server=rabbitmqServer(rpc_request_queue)
rd_server.start_consuming()
server.py
總結(jié)
以上是生活随笔為你收集整理的python消息队列中间件_python-RabbtiMQ消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 陇南看输卵管粘连最好的医院推荐
- 下一篇: python中变量名后的逗号_深入浅析p