日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python延时队列_如何通过Python实现RabbitMQ延迟队列

發布時間:2024/8/23 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python延时队列_如何通过Python实现RabbitMQ延迟队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在做一任務時,遇到需要延遲處理的數據,最開始的做法是現將數據存儲在數據庫,然后寫個腳本,隔五分鐘掃描數據表再處理數據,實際效果并不好。因為系統本身一直在用rabbitmq做異步處理任務的中間件,所以想到是否可以利用rabbitmq實現延遲隊列。功夫不負有心人,rabbitmq雖然沒有現成可用的延遲隊列,但是可以利用其兩個重要特性來實現之:1、time to live(ttl)消息超時機制;2、dead letter exchanges(dlx)死信隊列。下面將具體描述實現原理以及實現代

延遲隊列的基礎原理time to live(ttl)

rabbitmq可以針對queue設置x-expires 或者 針對message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為準),則消息變為dead letter(死信)

rabbitmq消息的過期時間有兩種方法設置。

通過隊列(queue)的屬性設置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)對消息單獨設置,每條消息ttl可以不同。

如果同時使用,則消息的過期時間以兩者之間ttl較小的那個數值為準。消息在隊列的生存時間一旦超過設置的ttl值,就成為死信(dead letter)

dead letter exchanges(dlx)

rabbitmq的queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。

x-dead-letter-exchange:出現死信(dead letter)之后將dead letter重新發送到指定exchange

x-dead-letter-routing-key:出現死信(dead letter)之后將dead letter重新按照指定的routing-key發送

隊列中出現死信(dead letter)的情況有:

消息或者隊列的ttl過期。(延遲隊列利用的特性)

隊列達到最大長度

消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個特性,將隊列設置ttl規則,隊列ttl過期后消息會變成死信,然后利用dlx特性將其轉發到另外的交換機和隊列就可以被重新消費,達到延遲消費效果。

延遲隊列設計及實現(python)

從上面描述,延遲隊列的實現大致分為兩步:

產生死信,有兩種方式per-message ttl和 queue ttl,因為我的需求中是所有的消息延遲處理時間相同,所以本實現中采用 queue ttl設置隊列的ttl,如果需要將隊列中的消息設置不同的延遲處理時間,則設置per-message ttl()

設置死信的轉發規則,dead letter exchanges設置方法()

完整代碼如下:

"""

created on fri aug 3 17:00:44 2018

@author: bge

"""

import pika,json,logging

class rabbitmqclient:

def __init__(self, conn_str='amqp://user:pwd@host:port/%2f'):

self.exchange_type = "direct"

self.connection_string = conn_str

self.connection = pika.blockingconnection(pika.urlparameters(self.connection_string))

self.channel = self.connection.channel()

self._declare_retry_queue() #retryqueue and retryexchange

logging.debug("connection established")

def close_connection(self):

self.connection.close()

logging.debug("connection closed")

def declare_exchange(self, exchange):

self.channel.exchange_declare(exchange=exchange,

exchange_type=self.exchange_type,

durable=true)

def declare_queue(self, queue):

self.channel.queue_declare(queue=queue,

durable=true,)

def declare_delay_queue(self, queue,dlx='retryexchange',ttl=60000):

"""

創建延遲隊列

:param ttl: ttl的單位是us,ttl=60000 表示 60s

:param queue:

:param dlx:死信轉發的exchange

:return:

"""

arguments={}

if dlx:

#設置死信轉發的exchange

arguments[ 'x-dead-letter-exchange']=dlx

if ttl:

arguments['x-message-ttl']=ttl

print(arguments)

self.channel.queue_declare(queue=queue,

durable=true,

arguments=arguments)

def _declare_retry_queue(self):

"""

創建異常交換器和隊列,用于存放沒有正常處理的消息。

:return:

"""

self.channel.exchange_declare(exchange='retryexchange',

exchange_type='fanout',

durable=true)

self.channel.queue_declare(queue='retryqueue',

durable=true)

self.channel.queue_bind('retryqueue', 'retryexchange','retryqueue')

def publish_message(self,routing_key, msg,exchange='',delay=0,ttl=none):

"""

發送消息到指定的交換器

:param exchange: rabbitmq交換器

:param msg: 消息實體,是一個序列化的json字符串

:return:

"""

if delay==0:

self.declare_queue(routing_key)

else:

self.declare_delay_queue(routing_key,ttl=ttl)

if exchange!='':

self.declare_exchange(exchange)

self.channel.basic_publish(exchange=exchange,

routing_key=routing_key,

body=msg,

properties=pika.basicproperties(

delivery_mode=2,

type=exchange

))

self.close_connection()

print("message send out to %s" % exchange)

logging.debug("message send out to %s" % exchange)

def start_consume(self,callback,queue='#',delay=1):

"""

啟動消費者,開始消費rabbitmq中的消息

:return:

"""

if delay==1:

queue='retryqueue'

else:

self.declare_queue(queue)

self.channel.basic_qos(prefetch_count=1)

try:

self.channel.basic_consume( # 消費消息

callback, # 如果收到消息,就調用callback函數來處理消息

queue=queue, # 你要從那個隊列里收消息

)

self.channel.start_consuming()

except keyboardinterrupt:

self.stop_consuming()

def stop_consuming(self):

self.channel.stop_consuming()

self.close_connection()

def message_handle_successfully(channel, method):

"""

如果消息處理正常完成,必須調用此方法,

否則rabbitmq會認為消息處理不成功,重新將消息放回待執行隊列中

:param channel: 回調函數的channel參數

:param method: 回調函數的method參數

:return:

"""

channel.basic_ack(delivery_tag=method.delivery_tag)

def message_handle_failed(channel, method):

"""

如果消息處理失敗,應該調用此方法,會自動將消息放入異常隊列

:param channel: 回調函數的channel參數

:param method: 回調函數的method參數

:return:

"""

channel.basic_reject(delivery_tag=method.delivery_tag, requeue=false)

發布消息代碼如下:

from mq.rabbitmq import rabbitmqclient

print("start program")

client = rabbitmqclient()

msg1 = '{"key":"value"}'

client.publish_message('test-delay',msg1,delay=1,ttl=10000)

print("message send out")

消費者代碼如下:

from mq.rabbitmq import rabbitmqclient

import json

print("start program")

client = rabbitmqclient()

def callback(ch, method, properties, body):

msg = body.decode()

print(msg)

# 如果處理成功,則調用此消息回復ack,表示消息成功處理完成。

rabbitmqclient.message_handle_successfully(ch, method)

queue_name = "retryqueue"

client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持萬仟網。

希望與廣大網友互動??

點此進行留言吧!

總結

以上是生活随笔為你收集整理的python延时队列_如何通过Python实现RabbitMQ延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。