flask + celery实现定时任务和异步
參考資料:?
Celery 官網(wǎng):http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/
Celery簡(jiǎn)介
除Celery是一個(gè)異步任務(wù)的調(diào)度工具。 Celery 是 Distributed Task Queue,分布式任務(wù)隊(duì)列,分布式?jīng)Q定了可以有多個(gè) worker 的存在,隊(duì)列表示其是異步操作,即存在一個(gè)產(chǎn)生任務(wù)提出需求的工頭,和一群等著被分配工作的碼農(nóng)。
Broker
在 Python 中定義 Celery 的時(shí)候,我們要引入 Broker(消息中間件),中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個(gè)中間人的角色。在工頭提出任務(wù)的時(shí)候,把所有的任務(wù)放到 Broker 里面,在 Broker 的另外一頭,一群碼農(nóng)等著取出一個(gè)個(gè)任務(wù)準(zhǔn)備著手做。
Backend
這種模式注定了整個(gè)系統(tǒng)會(huì)是個(gè)開環(huán)系統(tǒng),工頭對(duì)于碼農(nóng)們把任務(wù)做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務(wù)的結(jié)果。這個(gè) Backend 有點(diǎn)像我們的 Broker,也是存儲(chǔ)任務(wù)的信息用的,只不過這里存的是那些任務(wù)的返回結(jié)果。我們可以選擇只讓錯(cuò)誤執(zhí)行的任務(wù)返回結(jié)果到 Backend,這樣我們?nèi)』亟Y(jié)果,便可以知道有多少任務(wù)執(zhí)行失敗了。
Celery應(yīng)用場(chǎng)景
1.你想對(duì)100臺(tái)機(jī)器執(zhí)行一條批量命令,可能會(huì)花很長(zhǎng)時(shí)間 ,但你不想讓你的程序等著結(jié)果返回,而是給你返回 一個(gè)任務(wù)ID,你過一段時(shí)間只需要拿著這個(gè)任務(wù)id就可以拿到任務(wù)執(zhí)行結(jié)果, 在任務(wù)執(zhí)行ing進(jìn)行時(shí),你可以繼續(xù)做其它的事情。
2.你想做一個(gè)定時(shí)任務(wù),比如每天檢測(cè)一下你們所有客戶的資料,如果發(fā)現(xiàn)今天 是客戶的生日,就給他發(fā)個(gè)短信祝福
Celery的特點(diǎn)
1.簡(jiǎn)單:一單熟悉了celery的工作流程后,配置和使用還是比較簡(jiǎn)單的
2.高可用:當(dāng)任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery 會(huì)自動(dòng)嘗試重新執(zhí)行任務(wù)
3.快速:一個(gè)單進(jìn)程的celery每分鐘可處理上百萬個(gè)任務(wù)
3.靈活: 幾乎celery的各個(gè)組件都可以被擴(kuò)展及自定制
Celery工作基本流程
?
我們的項(xiàng)目
項(xiàng)目目錄:
proj/celery.py
from __future__ import absolute_import, unicode_literals from celery import Celeryapp = Celery('proj',broker = 'amqp://',backend = 'amqp://',include = ['proj.tasks'])app.conf.update(result_expires = 3600 )if __name__ == '__main__':app.start()在這個(gè)模塊中創(chuàng)建了Celery實(shí)例(通常稱為app)
要在項(xiàng)目中使用Celery只需要通過import導(dǎo)入該實(shí)例就行了
- broker參數(shù)指定要使用的中間件的URL
- backend參數(shù)指定使用的result backend
用來跟蹤任務(wù)狀態(tài)和結(jié)果,雖然默認(rèn)狀態(tài)下結(jié)果不可用。以上例子中使用RPC result backend。當(dāng)然,不同的result backend都有自己的好處和壞處,根據(jù)自己實(shí)際情況進(jìn)行選擇,如果不需要最好禁用。通過設(shè)置@task(ignore_result=True)選項(xiàng)來禁用耽擱任務(wù))
- include參數(shù)是當(dāng)worker啟動(dòng)時(shí)導(dǎo)入的模塊列表需要在這里添加自己的任務(wù)莫夸這樣worker就可以找到任務(wù)
proj/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app@app.task def add(x, y):return x + y@app.task def mul(x, y):return x * y@app.task def xsum(numbers):return sum(numbers)
?
啟動(dòng)worker
Celery程序可以用來啟動(dòng)worker:
celery -A proj worker -l info -------------- celery@centos6 v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-2.6.32-696.el6.x86_64-x86_64-with-centos-6.9-Final 2018-03-26 12:27:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: task:0x7fe5cfbd20d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues].> celery exchange=celery(direct) key=celery[tasks][2018-03-26 12:27:49,921: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2018-03-26 12:27:49,926: INFO/MainProcess] mingle: searching for neighbors [2018-03-26 12:27:49,499: INFO/MainProcess] mingle: sync with 1 nodes [2018-03-26 12:27:50,950: INFO/MainProcess] mingle: sync complete [2018-03-26 12:27:50,957: INFO/MainProcess] celery@centos6 ready.- broker是在celery模塊中指定的中間件參數(shù)的url,也可以在命令行中通過-b選項(xiàng)指定不同的中間件
- Concurrent是用于并行處理的任務(wù)的預(yù)創(chuàng)建worker進(jìn)程數(shù)量,當(dāng)所有的任務(wù)都在忙于工作時(shí),新的任務(wù)必須等待之前的執(zhí)行完成才能處理
默認(rèn)的并發(fā)數(shù)是機(jī)器上CPU的數(shù)量,可以通過celery worker -c選項(xiàng)指定自定義數(shù)量。沒有推薦值,最佳數(shù)量取決于很多因素,但是如果你的任務(wù)主要是I/O相關(guān)的,就可以增加這個(gè)數(shù)量。實(shí)驗(yàn)表明,增加超過兩倍CPU數(shù)量效果很差,而且可能會(huì)降低性能
除了prefork pool,Celery還支持Eventlet、Gevent并且還能在單線程上運(yùn)行
- Event是一個(gè)可選項(xiàng),當(dāng)啟用的時(shí)候,Celery會(huì)發(fā)送監(jiān)控(消息)來反映worker的操作,也可以被用來監(jiān)視像celery、events和Flower(實(shí)時(shí)Celery監(jiān)控)這樣的程序。
- Queues是worker將使用的任務(wù)的隊(duì)列的集合,worker可以一次接受幾個(gè)隊(duì)列,它用來將消息路由到特定的工作者以作為服務(wù)質(zhì)量、關(guān)注點(diǎn)分離、和優(yōu)化的一種方式
可以通過命令行獲取完整的列表————celery worker --help
停止worker
ctrl-c
后臺(tái)
生產(chǎn)環(huán)境中一般將worker放到后臺(tái),后臺(tái)腳本使用celery multi命令后臺(tái)啟動(dòng)一個(gè)或多個(gè)worker
celery multi start w1 -A proj -l info控制臺(tái)打印
celery multi v4.1.0 (latentcall) > Starting nodes...> w1@centos6: OK Stale pidfile exists - Removing it.也可以重啟:
celery multi restart w1 -A proj -l info celery multi v4.1.0 (latentcall) > Stopping nodes...> w1@centos6: TERM -> 23620 > Waiting for 1 node -> 23620.....> w1@centos6: OK > Restarting node w1@centos6: OK > Waiting for 1 node -> None...停止:
celery multi stop w1 -A proj -l infostop命令是異步的所以它不會(huì)等待worker關(guān)閉,可以使用stopwait命令來確保當(dāng)前執(zhí)行都任務(wù)在退出前都已執(zhí)行完畢
celery multi stopwait w1 -A proj -l infocelery multi不會(huì)存儲(chǔ)關(guān)于worker的信息,所以重啟的時(shí)候需要使用同樣的命令行參數(shù)。在停止時(shí),必須使用相同的pidfile和logfile參數(shù)
默認(rèn)情況下,程序?qū)⒃诋?dāng)期目錄創(chuàng)建pid和log文件,為了防止多個(gè)worker運(yùn)行出錯(cuò),推薦將這些文件放在專門的目錄:
mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log使用multi指令可以啟動(dòng)多個(gè)worker,并且有一個(gè)強(qiáng)大的命令行語法來為不同的worker指定參數(shù):
celery multi start 10 -A proj -l info -Q:1-3 images, video -Q:4, 5 data -Q default -L:4,5 debug~Detail about?multi?temp
--app參數(shù)
--app參數(shù)指定使用的Celery應(yīng)用實(shí)例,必須以module.path:attribute的形式出現(xiàn)
但也支持快捷方式,只要包名指定了,就會(huì)嘗試在應(yīng)用實(shí)例中搜索
使用--app=proj:
任務(wù)調(diào)用
- 可以通過使用delay()方法來調(diào)用一個(gè)任務(wù)
這個(gè)方法實(shí)際上是另一種叫做apply_async()方法的快捷方式
add.applay_async((3, 3))后者(applay_async())能夠指定執(zhí)行選項(xiàng),比如運(yùn)行時(shí)間(倒計(jì)時(shí))、應(yīng)該發(fā)送的隊(duì)列等等:
add.apply_async((2, 2), queue='lopri', countdown=10)上述案例中,任務(wù)會(huì)被發(fā)送給一個(gè)名為lopri的隊(duì)列,該任務(wù)會(huì)在信息發(fā)送后十秒執(zhí)行
直接應(yīng)用該任務(wù)會(huì)在當(dāng)前進(jìn)程中執(zhí)行任務(wù),不會(huì)發(fā)送消息
add(3, 3)result:6
三種方法delay()、apply_async()和應(yīng)用__call__,代表了Celery調(diào)用API,也同樣用于簽名
-
每一個(gè)任務(wù)調(diào)用都有一個(gè)唯一的標(biāo)識(shí)符(UUID),這個(gè)就是任務(wù)的id
-
delay()和apply_async方法會(huì)返回一個(gè)AsyncResult實(shí)例,可以被用來跟蹤任務(wù)執(zhí)行狀態(tài),但是需要開啟result backend這樣狀態(tài)才能被存儲(chǔ)在某處
-
Results默認(rèn)是禁用的,因?yàn)閷?shí)際上沒有一個(gè)result backend適用于每個(gè)應(yīng)用程序,所以要考慮到每個(gè)獨(dú)立backend的缺點(diǎn)來選擇一個(gè)使用。對(duì)于許多保持返回值的任務(wù)來說都不是很有用,所以這個(gè)默認(rèn)的禁用是很明智的。還需要注意的是,result backend并不用來監(jiān)控任務(wù)和worker,對(duì)于Celery有專門的事件消息
如果配置了result backend就可以接收到任務(wù)的返回值
result = add.delay(2, 2) res.get(timeout=1)retult:4
- 可以通過查看id屬性找到任務(wù)的id
result:073c568d-ca88-4198-b735-0f98f861218b
-
如果任務(wù)拋出異常也可以檢查到異常,默認(rèn)result.get()可以傳播任何錯(cuò)誤
-
如果不希望錯(cuò)誤傳播,可以通過propagete屬性禁用
在這種情況下,它會(huì)返回所提出的異常實(shí)例,以便檢查任務(wù)是否成功或失敗,您將不得不在結(jié)果實(shí)例上使用相應(yīng)的方法
res.failed() res.successful()也可以通過state找到任務(wù)的狀態(tài)
res.stateresult:FAILUTE
- 一個(gè)任務(wù)只能有一個(gè)狀態(tài),但是可以在幾個(gè)狀態(tài)中發(fā)展,典型任務(wù)階段可能是這樣
STARTED狀態(tài)是一個(gè)特殊的狀態(tài),只有在task_track_started設(shè)置啟用或者@task(track_started=True)選項(xiàng)設(shè)置的時(shí)候才會(huì)被記錄下來
PENDING狀態(tài)實(shí)際上不是記錄狀態(tài),而是未知任務(wù)id的默認(rèn)狀態(tài)
from proj.celery import app res = app.AsyncResult('this-id-does-not-exist') res.stateresult:PENDING
- 如果重新嘗試這個(gè)任務(wù)可能會(huì)變得更復(fù)雜,對(duì)于一個(gè)嘗試過兩遍的任務(wù)來說階段可能是這樣:
Canvas:設(shè)計(jì)任務(wù)流
前面學(xué)習(xí)了通過delay方法調(diào)用任務(wù),通常這樣就夠了,但是有時(shí)可能需要將任務(wù)調(diào)用的簽名傳遞給另一個(gè)進(jìn)程或者另一個(gè)函數(shù)的參數(shù),對(duì)Celery來說叫做signatures
簽名以某種方式包裝了單一任務(wù)調(diào)用的參數(shù)和執(zhí)行選項(xiàng),以便將其傳遞給函數(shù),甚至序列化后發(fā)送。
可以使用參數(shù)(2, 2)和十秒的計(jì)時(shí)器來為add任務(wù)創(chuàng)建一個(gè)簽名
add.signature((2, 2), countdown=10)也可以簡(jiǎn)寫:
add.s(2, 2)調(diào)用API
簽名的實(shí)例也支持調(diào)用API,意味著也可以有delay和apply_async方法
但是有一個(gè)區(qū)別,那就是簽名可能已經(jīng)指定了一個(gè)參數(shù)簽名,add任務(wù)接受兩個(gè)參數(shù),所以一個(gè)制定了兩個(gè)參數(shù)的簽名將會(huì)形成一個(gè)完整的簽名
s1 = add.s(2, 2) res = s1.delay() res.get()也可以使用不完成的簽名,叫做partials:
s1 = add.s(2)s2現(xiàn)在是部分簽名,需要另一個(gè)參數(shù)才完整,則可以在調(diào)用signature的時(shí)候處理
# resolves the partial: add(8, 2) res = s2.delay(8) res.get()在這里,添加了參數(shù)8,對(duì)已存在的參數(shù)2組成了一個(gè)完整的簽名add(8, 2)
關(guān)鍵字參數(shù)也可以延遲添加,會(huì)和已存在的關(guān)鍵字參數(shù)合并,新參數(shù)優(yōu)先(新參數(shù)覆蓋舊參數(shù))
s3 = add.s(2, 2, debug=True) s3.delay(debug=False)已聲明的簽名支持調(diào)用API:
- sig.apply_async(arg=(), kwargs={}, **options
使用可選部分參數(shù)和部分關(guān)鍵字參數(shù)調(diào)用簽名,也支持部分可執(zhí)行選項(xiàng) - sig.delay(*args, **kwargs)
apply_async的星參版本,任何參數(shù)都會(huì)被預(yù)先記錄在簽名的參數(shù)你,關(guān)鍵字參數(shù)會(huì)和現(xiàn)有的keys合并
基本體
- group
- chain
- chord
- map
- starmap
- chunks
這些基本體本身就是簽名對(duì)象,因此,它們可以以任何多種方式組合起來組成復(fù)雜的工作流
Group
一個(gè)group同時(shí)調(diào)用任務(wù)列表,返回一個(gè)特殊結(jié)果實(shí)例,這樣可以以組的形式檢查結(jié)果,并按順序檢索返回值
from celery import group from proj.tasks import addgroup(add.s(i, i) for in in range(10))().get()result:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- Partial group
result:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
任務(wù)可以被相互連接起來,這樣在一個(gè)任務(wù)返回后另一個(gè)任務(wù)被調(diào)用
from celery import chain form proj.tasks import add, mul// 用法1 chian(add.s(4, 4) | mul.s(8))().get()// 用法2 g = chain(add.s(4) | mul.s(8)) g(4).get()// 用法3 (add.s(4, 4) | mul.s(8))().get()Chords
chord是一個(gè)有返回值的group
from celery import chord from proj.tasks import add, xsum// 用法1 (group(add.s(i, i) for i in range(10)) | xsum.s())().get()// 用法2 upload_document.s(file) | group(apply_filter.s() for filter in filters)路由
Celery支持AMQP提供的所有路由設(shè)施,但是它也支持簡(jiǎn)單路由,將消息發(fā)送到指定的隊(duì)列
task_routes設(shè)置可以是用戶按名稱對(duì)任務(wù)進(jìn)行路由,并將一切集中在一個(gè)位置
app.conf.update{task_routes = {'proj.tasks.add': {'queue': 'hipri'},} }可以在運(yùn)行時(shí)通過queue參數(shù)指定隊(duì)列到apply_async:
from proj.tasks import add add.apply_async((2,2), queue='hipri')然后可以通過指定celery worker -Q選項(xiàng)使worker從隊(duì)列中消費(fèi)
celery -A proj worker -Q hipri也可以通過使用逗號(hào)分隔符(,)來指定多個(gè)隊(duì)列
celery -A proj worker -Q hipri, celery默認(rèn)隊(duì)列因?yàn)闅v史原因命名為:celery
隊(duì)列的順序無關(guān)緊要,因?yàn)閣orker會(huì)給隊(duì)列相同的權(quán)重
遠(yuǎn)程控制
如果使用RabbitMQ(AMQP)、Redis或者Qpid作為中間件就可以在運(yùn)行時(shí)監(jiān)視worker
- 查看worker當(dāng)前執(zhí)行的任務(wù)
這是通過使用廣播消息實(shí)現(xiàn)的,因此,急群眾的每一個(gè)工作人員都能接收到所有遠(yuǎn)程控制命令
- 也可以指令一個(gè)或多個(gè)worker使用--destination選項(xiàng)請(qǐng)求行動(dòng),這是一個(gè)逗號(hào)分隔的worker主機(jī)名列表
如果沒有提供目標(biāo),那么每個(gè)worker都會(huì)對(duì)請(qǐng)求做出反應(yīng)并回復(fù)
- celery inspect命令包含的命令不會(huì)改變worker的任何東西,它只會(huì)回復(fù)關(guān)于worker內(nèi)部發(fā)生的事情的信息和統(tǒng)計(jì)信息,可以執(zhí)行命令檢查列表:
- celery control命令,包含在運(yùn)行時(shí)實(shí)際改變worker操作的命令
- 強(qiáng)制worker啟用事件消息(用于監(jiān)視任務(wù)和工作人員)
當(dāng)事件激活,可以啟動(dòng)event dumper查看worker正在做什么
celery -A proj events --dump或者
celery -A proj events當(dāng)完成監(jiān)控可以再次禁用events
celery -A proj control disable_eventscelery status命令還能使用遠(yuǎn)程控制命令,并顯示集群中的在線worker列表
celery -A proj status時(shí)區(qū)
所有的時(shí)間和日期、內(nèi)部和消息多使用UTC時(shí)間區(qū)域
當(dāng)worker收到消息,例如使用倒計(jì)時(shí)設(shè)置,它將UTC時(shí)間轉(zhuǎn)換為本地時(shí)間。如果希望使用與系統(tǒng)時(shí)區(qū)不同的地區(qū),那么必須要使用時(shí)區(qū)設(shè)置來配置該時(shí)區(qū):
app.conf.timezone = 'Asia/Shanghai'?
最優(yōu)化
默認(rèn)的配置并沒有針對(duì)吞吐量進(jìn)行優(yōu)化,它試圖在許多短任務(wù)和更少的長(zhǎng)任務(wù)之間走中間路線,這是吞吐量和公平調(diào)度之間的折中
?
總結(jié)
以上是生活随笔為你收集整理的flask + celery实现定时任务和异步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: celery定时任务简单使用
- 下一篇: 线程,进程,协程详细解释