日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python拓展7(Celery消息队列配置定时任务)

發(fā)布時(shí)間:2023/12/9 python 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python拓展7(Celery消息队列配置定时任务) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

介紹

celery 定時(shí)器是一個(gè)調(diào)度器(scheduler);它會(huì)定時(shí)地開(kāi)啟(kicks off)任務(wù),然后由集群中可用的工人(worker)來(lái)執(zhí)行。

定時(shí)任務(wù)記錄(entries)默認(rèn) 從 beat_schedule 設(shè)置中獲取,但自定義存儲(chǔ)也可以使用,如把記錄存儲(chǔ)到SQL數(shù)據(jù)庫(kù)中。

要確保同一時(shí)間一份時(shí)間表上只有一個(gè)調(diào)度器在運(yùn)行,否則會(huì)因?yàn)橹貜?fù)發(fā)送任務(wù)而結(jié)束。使用集中途徑意味著定時(shí)任務(wù)不用必須同步,并且服務(wù)無(wú)需用鎖操控。

  • user:用戶(hù)程序,用于告知celery去執(zhí)行一個(gè)任務(wù)。
  • broker: 存放任務(wù)(依賴(lài)RabbitMQ或Redis,進(jìn)行存儲(chǔ))
  • worker:執(zhí)行任務(wù)

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(測(cè)試中) 充當(dāng)broker來(lái)進(jìn)行消息的接收,并且也支持多個(gè)broker和worker來(lái)實(shí)現(xiàn)高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery version 4.0 runs onPython ?2.7, 3.4, 3.5?PyPy ?5.4, 5.5?This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:Python 2.6: Celery series 3.1 or earlier.Python 2.5: Celery series 3.0 or earlier.Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. 版本和要求

環(huán)境準(zhǔn)備:

  • 安裝rabbitMQ或Redis
    https://www.cnblogs.com/L5251/articles/9146825.html

  ? ?https://www.cnblogs.com/L5251/articles/9325586.html

  • 安裝celery
    ? ? ?pip3 install celery

快速上手

import time from celery import Celeryapp = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')@app.task def xxxxxx(x, y):time.sleep(10)return x + y s1.py s2.py from celery.result import AsyncResult from s1 import appasync = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)if async.successful():result = async.get()print(result)# result.forget() # 將結(jié)果刪除 elif async.failed():print('執(zhí)行失敗') elif async.status == 'PENDING':print('任務(wù)等待中被執(zhí)行') elif async.status == 'RETRY':print('任務(wù)異常后正在重試') elif async.status == 'STARTED':print('任務(wù)已經(jīng)開(kāi)始被執(zhí)行') s3.py # 執(zhí)行 s1.py 創(chuàng)建worker(終端執(zhí)行命令): celery worker -A s1 -l info # PS:Windows系統(tǒng)上執(zhí)行命令時(shí)出錯(cuò)解決方法 pip3 install eventlet # 后期運(yùn)行修改為:celery worker -A s1 -l info -P eventlet # 執(zhí)行 s2.py ,創(chuàng)建一個(gè)任務(wù)并獲取任務(wù)ID: python3 s2.py# 執(zhí)行 s3.py ,檢查任務(wù)狀態(tài)并獲取結(jié)果:python3 s3.py

多任務(wù)結(jié)構(gòu)

pro_cel├── celery_tasks# celery相關(guān)文件夾│ ├── celery.py # celery連接和配置相關(guān)文件│ └── tasks.py # 所有任務(wù)函數(shù)├── check_result.py # 檢查結(jié)果└── send_task.py # 觸發(fā)任務(wù) #!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celerycelery = Celery('xxxxxx',broker='redis://192.168.0.111:6379',backend='redis://192.168.0.111:6379',include=['celery_tasks.tasks'])# 時(shí)區(qū) celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False pro_cel/celery_tasks/celery #!/usr/bin/env python # -*- coding:utf-8 -*-import time from .celery import celery@celery.task def xxxxx(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果"@celery.task def hhhhhh(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果" pro_cel/celery_tasks/tasks.py #!/usr/bin/env python # -*- coding:utf-8 -*-from celery.result import AsyncResult from celery_tasks.celery import celeryasync = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)if async.successful():result = async.get()print(result)# result.forget() # 將結(jié)果刪除 elif async.failed():print('執(zhí)行失敗') elif async.status == 'PENDING':print('任務(wù)等待中被執(zhí)行') elif async.status == 'RETRY':print('任務(wù)異常后正在重試') elif async.status == 'STARTED':print('任務(wù)已經(jīng)開(kāi)始被執(zhí)行') pro_cel/check_result.py #!/usr/bin/env python # -*- coding:utf-8 -*- import celery_tasks.tasks# 立即告知celery去執(zhí)行xxxxxx任務(wù),并傳入兩個(gè)參數(shù) result = celery_tasks.tasks.xxxxx.delay(4, 4)print(result.id) pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定時(shí)任務(wù)

1. 設(shè)定時(shí)間讓celery執(zhí)行一個(gè)任務(wù)

import datetime from celery_tasks.tasks import xxxxx """ from datetime import datetimev1 = datetime(2017, 4, 11, 3, 0, 0) print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2)""" ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())s10 = datetime.timedelta(seconds=10) ctime_x = utc_ctime + s10# 使用apply_async并設(shè)定時(shí)間 result = xxxxx.apply_async(args=[1, 3], eta=ctime_x) print(result.id)

2. 類(lèi)似于contab的定時(shí)任務(wù)

""" celery beat -A proj celery worker -A proj -l info""" from celery import Celery from celery.schedules import crontabapp = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = Falseapp.conf.beat_schedule = {# 'add-every-10-seconds': {# 'task': 'proj.s1.add1',# 'schedule': 10.0,# 'args': (16, 16)# },'add-every-12-seconds': {'task': 'proj.s1.add1','schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),'args': (16, 16)}, }

注:如果想要定時(shí)執(zhí)行類(lèi)似于crontab的任務(wù),需要定制 Scheduler來(lái)完成。

Flask中應(yīng)用Celery

pro_flask_celery/ ├── app.py ├── celery_tasks├── celery.py└── tasks.py #!/usr/bin/env python # -*- coding:utf-8 -*-from flask import Flask from celery.result import AsyncResultfrom celery_tasks import tasks from celery_tasks.celery import celeryapp = Flask(__name__)TASK_ID = None@app.route('/') def index():global TASK_IDresult = tasks.xxxxx.delay()# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))TASK_ID = result.idreturn "任務(wù)已經(jīng)提交"@app.route('/result') def result():global TASK_IDresult = AsyncResult(id=TASK_ID, app=celery)if result.ready():return result.get()return "xxxx"if __name__ == '__main__':app.run() app.py #!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from celery.schedules import crontabcelery = Celery('xxxxxx',broker='redis://192.168.10.48:6379',backend='redis://192.168.10.48:6379',include=['celery_tasks.tasks'])# 時(shí)區(qū) celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False celery_tasks/celery.py #!/usr/bin/env python # -*- coding:utf-8 -*-import time from .celery import celery@celery.task def hello(*args, **kwargs):print('執(zhí)行hello')return "hello"@celery.task def xxxxx(*args, **kwargs):print('執(zhí)行xxxxx')return "xxxxx"@celery.task def hhhhhh(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果" celery_task/tasks.py

記錄

為了定時(shí)調(diào)用任務(wù),你必須添加記錄到打點(diǎn)列表中:

from celery import Celery from celery.schedules import crontabapp = Celery()@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):# 每10秒調(diào)用 test('hello') .sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# 每30秒調(diào)用 test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10)# 每周一上午7:30執(zhí)行 sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task def test(arg):print(arg)

用on_after_configure處理器進(jìn)行這些設(shè)置意味著當(dāng)使用test.s()時(shí)我們不會(huì)在模塊層面運(yùn)行app 。

add_periodic_task() 函數(shù)在幕后會(huì)添加記錄到beat_schedule設(shè)定,同樣的設(shè)定可以用來(lái)手動(dòng)設(shè)置定時(shí)任務(wù):

例子: 每30秒運(yùn)行 tasks.add .

app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)}, } app.conf.timezone = 'UTC'

一般會(huì)使用配置文件進(jìn)行配置,如下?
celeryconfig.py:

broker_url = 'pyamqp://' result_backend = 'rpc://'task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)}, }

程序里使用:

app.config_from_object('celeryconfig')注意 如果你的參數(shù)元組里只有一個(gè)項(xiàng)目,只用一個(gè)逗號(hào)就可以了,不要圓括號(hào)。

時(shí)間表使用時(shí)間差意味著每30秒間隔會(huì)發(fā)送任務(wù)(第一個(gè)任務(wù)在celery定時(shí)器開(kāi)啟后30秒發(fā)送,然后上每次距一次運(yùn)行后30秒發(fā)送一次)

可使用的屬性

    • task

      要執(zhí)行的任務(wù)名字

    • schedule

      執(zhí)行的頻率

      可以是整數(shù)秒數(shù),時(shí)間差,或者一個(gè)周期( crontab)。你也可以自 定義你的時(shí)間表類(lèi)型,通過(guò)擴(kuò)展schedule接口。

    • args

      位置參數(shù) (list 或 tuple).

    • kwargs

      鍵值參數(shù) (dict).

    • options

      執(zhí)行選項(xiàng) (dict).

      這可以是任何被apply_async()支持的參數(shù)與—-exchange, routing_key, expires,等。

    • relative

      如果 relative 是 true ,時(shí)間表“由時(shí)鐘時(shí)間”安排,意味著 頻率近似到最近的秒,分鐘,小時(shí)或天,這取決于時(shí)間差中的時(shí)間間隔。?
      默認(rèn)relative是false,頻率不會(huì)近似,會(huì)相對(duì)于celery的啟動(dòng)時(shí)間。

      Crontab 表達(dá)式語(yǔ)法非常靈活。

例子含義
crontab()每分鐘執(zhí)行
crontab(minute=0, hour=0)每天午夜執(zhí)行
crontab(minute=0, hour=’*/3’)每三個(gè)小時(shí)執(zhí)行: 午夜, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm.
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’)同上
crontab(minute=’*/15’)每15分鐘執(zhí)行
crontab(day_of_week=’sunday’)星期日每分鐘
crontab(minute=’‘,hour=’‘, day_of_week=’sun’)同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’)每10分鐘執(zhí)行,僅限于周六日3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’)偶數(shù)小時(shí)或者能被3整除的小時(shí)數(shù)執(zhí)行
crontab(minute=0, hour=’*/5’)被5整除的小時(shí)數(shù),如3pm
crontab(minute=0, hour=’*/3,8-17’)8am-5pm能被3整除的
crontab(0, 0, day_of_month=’2’)每月第2天
crontab(0, 0,day_of_month=’2-30/3’)每偶數(shù)天
crontab(0, 0,day_of_month=’1-7,15-21’)每月1和3周
crontab(0, 0, day_of_month=’11’,month_of_year=’5’)每年5月11日
crontab(0, 0,month_of_year=’*/3’)每個(gè)季度第1月

開(kāi)啟調(diào)度

開(kāi)啟celery定時(shí)服務(wù):

$ celery -A proj beat

可以把定時(shí)器嵌入到工人(worker)中,通過(guò)啟用workers -B選項(xiàng),如果你永遠(yuǎn)不會(huì)運(yùn)行超過(guò)一個(gè)工人節(jié)點(diǎn)這就會(huì)很方便。但這不太常見(jiàn),不推薦在生產(chǎn)環(huán)境這樣使用:

$ celery -A proj worker -B

定時(shí)器需要在本地?cái)?shù)據(jù)庫(kù)文件(默認(rèn)名為 celerybeat-schedule )存儲(chǔ)任務(wù)上次運(yùn)行時(shí)間,所以它需要在當(dāng)前目錄中寫(xiě)權(quán)限。或者你也可以給這個(gè)文件指定一個(gè)位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

?

轉(zhuǎn)載于:https://www.cnblogs.com/L5251/articles/9332304.html

總結(jié)

以上是生活随笔為你收集整理的python拓展7(Celery消息队列配置定时任务)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。