python拓展7(Celery消息队列配置定时任务)
介紹
celery 定時(shí)器是一個(gè)調(diào)度器(scheduler);它會(huì)定時(shí)地開(kāi)啟(kicks off)任務(wù),然后由集群中可用的工人(worker)來(lái)執(zhí)行。
定時(shí)任務(wù)記錄(entries)默認(rèn) 從 beat_schedule 設(shè)置中獲取,但自定義存儲(chǔ)也可以使用,如把記錄存儲(chǔ)到SQL數(shù)據(jù)庫(kù)中。
要確保同一時(shí)間一份時(shí)間表上只有一個(gè)調(diào)度器在運(yùn)行,否則會(huì)因?yàn)橹貜?fù)發(fā)送任務(wù)而結(jié)束。使用集中途徑意味著定時(shí)任務(wù)不用必須同步,并且服務(wù)無(wú)需用鎖操控。
- user:用戶(hù)程序,用于告知celery去執(zhí)行一個(gè)任務(wù)。
- broker: 存放任務(wù)(依賴(lài)RabbitMQ或Redis,進(jìn)行存儲(chǔ))
- worker:執(zhí)行任務(wù)
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(測(cè)試中) 充當(dāng)broker來(lái)進(jìn)行消息的接收,并且也支持多個(gè)broker和worker來(lái)實(shí)現(xiàn)高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
Celery version 4.0 runs onPython ?2.7, 3.4, 3.5?PyPy ?5.4, 5.5?This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:Python 2.6: Celery series 3.1 or earlier.Python 2.5: Celery series 3.0 or earlier.Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. 版本和要求環(huán)境準(zhǔn)備:
- 安裝rabbitMQ或Redis
https://www.cnblogs.com/L5251/articles/9146825.html
? ?https://www.cnblogs.com/L5251/articles/9325586.html
- 安裝celery
? ? ?pip3 install celery
快速上手
import time from celery import Celeryapp = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')@app.task def xxxxxx(x, y):time.sleep(10)return x + y s1.py s2.py from celery.result import AsyncResult from s1 import appasync = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)if async.successful():result = async.get()print(result)# result.forget() # 將結(jié)果刪除 elif async.failed():print('執(zhí)行失敗') elif async.status == 'PENDING':print('任務(wù)等待中被執(zhí)行') elif async.status == 'RETRY':print('任務(wù)異常后正在重試') elif async.status == 'STARTED':print('任務(wù)已經(jīng)開(kāi)始被執(zhí)行') s3.py # 執(zhí)行 s1.py 創(chuàng)建worker(終端執(zhí)行命令): celery worker -A s1 -l info # PS:Windows系統(tǒng)上執(zhí)行命令時(shí)出錯(cuò)解決方法 pip3 install eventlet # 后期運(yùn)行修改為:celery worker -A s1 -l info -P eventlet # 執(zhí)行 s2.py ,創(chuàng)建一個(gè)任務(wù)并獲取任務(wù)ID: python3 s2.py# 執(zhí)行 s3.py ,檢查任務(wù)狀態(tài)并獲取結(jié)果:python3 s3.py多任務(wù)結(jié)構(gòu)
pro_cel├── celery_tasks# celery相關(guān)文件夾│ ├── celery.py # celery連接和配置相關(guān)文件│ └── tasks.py # 所有任務(wù)函數(shù)├── check_result.py # 檢查結(jié)果└── send_task.py # 觸發(fā)任務(wù) #!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celerycelery = Celery('xxxxxx',broker='redis://192.168.0.111:6379',backend='redis://192.168.0.111:6379',include=['celery_tasks.tasks'])# 時(shí)區(qū) celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False pro_cel/celery_tasks/celery #!/usr/bin/env python # -*- coding:utf-8 -*-import time from .celery import celery@celery.task def xxxxx(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果"@celery.task def hhhhhh(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果" pro_cel/celery_tasks/tasks.py #!/usr/bin/env python # -*- coding:utf-8 -*-from celery.result import AsyncResult from celery_tasks.celery import celeryasync = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)if async.successful():result = async.get()print(result)# result.forget() # 將結(jié)果刪除 elif async.failed():print('執(zhí)行失敗') elif async.status == 'PENDING':print('任務(wù)等待中被執(zhí)行') elif async.status == 'RETRY':print('任務(wù)異常后正在重試') elif async.status == 'STARTED':print('任務(wù)已經(jīng)開(kāi)始被執(zhí)行') pro_cel/check_result.py #!/usr/bin/env python # -*- coding:utf-8 -*- import celery_tasks.tasks# 立即告知celery去執(zhí)行xxxxxx任務(wù),并傳入兩個(gè)參數(shù) result = celery_tasks.tasks.xxxxx.delay(4, 4)print(result.id) pro_cel/send_task.py更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定時(shí)任務(wù)
1. 設(shè)定時(shí)間讓celery執(zhí)行一個(gè)任務(wù)
import datetime from celery_tasks.tasks import xxxxx """ from datetime import datetimev1 = datetime(2017, 4, 11, 3, 0, 0) print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2)""" ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())s10 = datetime.timedelta(seconds=10) ctime_x = utc_ctime + s10# 使用apply_async并設(shè)定時(shí)間 result = xxxxx.apply_async(args=[1, 3], eta=ctime_x) print(result.id)2. 類(lèi)似于contab的定時(shí)任務(wù)
""" celery beat -A proj celery worker -A proj -l info""" from celery import Celery from celery.schedules import crontabapp = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = Falseapp.conf.beat_schedule = {# 'add-every-10-seconds': {# 'task': 'proj.s1.add1',# 'schedule': 10.0,# 'args': (16, 16)# },'add-every-12-seconds': {'task': 'proj.s1.add1','schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),'args': (16, 16)}, }注:如果想要定時(shí)執(zhí)行類(lèi)似于crontab的任務(wù),需要定制 Scheduler來(lái)完成。
Flask中應(yīng)用Celery
pro_flask_celery/ ├── app.py ├── celery_tasks├── celery.py└── tasks.py #!/usr/bin/env python # -*- coding:utf-8 -*-from flask import Flask from celery.result import AsyncResultfrom celery_tasks import tasks from celery_tasks.celery import celeryapp = Flask(__name__)TASK_ID = None@app.route('/') def index():global TASK_IDresult = tasks.xxxxx.delay()# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))TASK_ID = result.idreturn "任務(wù)已經(jīng)提交"@app.route('/result') def result():global TASK_IDresult = AsyncResult(id=TASK_ID, app=celery)if result.ready():return result.get()return "xxxx"if __name__ == '__main__':app.run() app.py #!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from celery.schedules import crontabcelery = Celery('xxxxxx',broker='redis://192.168.10.48:6379',backend='redis://192.168.10.48:6379',include=['celery_tasks.tasks'])# 時(shí)區(qū) celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False celery_tasks/celery.py #!/usr/bin/env python # -*- coding:utf-8 -*-import time from .celery import celery@celery.task def hello(*args, **kwargs):print('執(zhí)行hello')return "hello"@celery.task def xxxxx(*args, **kwargs):print('執(zhí)行xxxxx')return "xxxxx"@celery.task def hhhhhh(*args, **kwargs):time.sleep(5)return "任務(wù)結(jié)果" celery_task/tasks.py記錄
為了定時(shí)調(diào)用任務(wù),你必須添加記錄到打點(diǎn)列表中:
from celery import Celery from celery.schedules import crontabapp = Celery()@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):# 每10秒調(diào)用 test('hello') .sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# 每30秒調(diào)用 test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10)# 每周一上午7:30執(zhí)行 sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task def test(arg):print(arg)用on_after_configure處理器進(jìn)行這些設(shè)置意味著當(dāng)使用test.s()時(shí)我們不會(huì)在模塊層面運(yùn)行app 。
add_periodic_task() 函數(shù)在幕后會(huì)添加記錄到beat_schedule設(shè)定,同樣的設(shè)定可以用來(lái)手動(dòng)設(shè)置定時(shí)任務(wù):
例子: 每30秒運(yùn)行 tasks.add .
app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)}, } app.conf.timezone = 'UTC'一般會(huì)使用配置文件進(jìn)行配置,如下?
celeryconfig.py:
程序里使用:
app.config_from_object('celeryconfig')注意 如果你的參數(shù)元組里只有一個(gè)項(xiàng)目,只用一個(gè)逗號(hào)就可以了,不要圓括號(hào)。時(shí)間表使用時(shí)間差意味著每30秒間隔會(huì)發(fā)送任務(wù)(第一個(gè)任務(wù)在celery定時(shí)器開(kāi)啟后30秒發(fā)送,然后上每次距一次運(yùn)行后30秒發(fā)送一次)
可使用的屬性
-
task
要執(zhí)行的任務(wù)名字
-
schedule
執(zhí)行的頻率
可以是整數(shù)秒數(shù),時(shí)間差,或者一個(gè)周期( crontab)。你也可以自 定義你的時(shí)間表類(lèi)型,通過(guò)擴(kuò)展schedule接口。
-
args
位置參數(shù) (list 或 tuple).
-
kwargs
鍵值參數(shù) (dict).
-
options
執(zhí)行選項(xiàng) (dict).
這可以是任何被apply_async()支持的參數(shù)與—-exchange, routing_key, expires,等。
-
relative
如果 relative 是 true ,時(shí)間表“由時(shí)鐘時(shí)間”安排,意味著 頻率近似到最近的秒,分鐘,小時(shí)或天,這取決于時(shí)間差中的時(shí)間間隔。?
默認(rèn)relative是false,頻率不會(huì)近似,會(huì)相對(duì)于celery的啟動(dòng)時(shí)間。Crontab 表達(dá)式語(yǔ)法非常靈活。
-
| crontab() | 每分鐘執(zhí)行 |
| crontab(minute=0, hour=0) | 每天午夜執(zhí)行 |
| crontab(minute=0, hour=’*/3’) | 每三個(gè)小時(shí)執(zhí)行: 午夜, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm. |
| crontab(minute=0,hour=’0,3,6,9,12,15,18,21’) | 同上 |
| crontab(minute=’*/15’) | 每15分鐘執(zhí)行 |
| crontab(day_of_week=’sunday’) | 星期日每分鐘 |
| crontab(minute=’‘,hour=’‘, day_of_week=’sun’) | 同上 |
| crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’) | 每10分鐘執(zhí)行,僅限于周六日3-4 am, 5-6 pm, and 10-11 pm |
| crontab(minute=0, hour=’/2,/3’) | 偶數(shù)小時(shí)或者能被3整除的小時(shí)數(shù)執(zhí)行 |
| crontab(minute=0, hour=’*/5’) | 被5整除的小時(shí)數(shù),如3pm |
| crontab(minute=0, hour=’*/3,8-17’) | 8am-5pm能被3整除的 |
| crontab(0, 0, day_of_month=’2’) | 每月第2天 |
| crontab(0, 0,day_of_month=’2-30/3’) | 每偶數(shù)天 |
| crontab(0, 0,day_of_month=’1-7,15-21’) | 每月1和3周 |
| crontab(0, 0, day_of_month=’11’,month_of_year=’5’) | 每年5月11日 |
| crontab(0, 0,month_of_year=’*/3’) | 每個(gè)季度第1月 |
開(kāi)啟調(diào)度
開(kāi)啟celery定時(shí)服務(wù):
$ celery -A proj beat可以把定時(shí)器嵌入到工人(worker)中,通過(guò)啟用workers -B選項(xiàng),如果你永遠(yuǎn)不會(huì)運(yùn)行超過(guò)一個(gè)工人節(jié)點(diǎn)這就會(huì)很方便。但這不太常見(jiàn),不推薦在生產(chǎn)環(huán)境這樣使用:
$ celery -A proj worker -B定時(shí)器需要在本地?cái)?shù)據(jù)庫(kù)文件(默認(rèn)名為 celerybeat-schedule )存儲(chǔ)任務(wù)上次運(yùn)行時(shí)間,所以它需要在當(dāng)前目錄中寫(xiě)權(quán)限。或者你也可以給這個(gè)文件指定一個(gè)位置:
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule?
轉(zhuǎn)載于:https://www.cnblogs.com/L5251/articles/9332304.html
總結(jié)
以上是生活随笔為你收集整理的python拓展7(Celery消息队列配置定时任务)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: C#调用Web Service时的身份验
- 下一篇: Python第一天学习---基础语法