celery 学习笔记 01-介绍
celery 學(xué)習(xí)筆記 01-介紹
celery 是 python 中的常用的任務(wù)隊(duì)列框架,經(jīng)常用于異步調(diào)用、后臺(tái)任務(wù)等工作。celery 本身以 python 寫(xiě),但協(xié)議可在不同的語(yǔ)言中實(shí)現(xiàn),其它語(yǔ)言也可以用 celery 執(zhí)行相應(yīng)的任務(wù)。在 web 應(yīng)用,為提高系統(tǒng)響應(yīng)速度,發(fā)送郵件、數(shù)據(jù)整理等需要長(zhǎng)時(shí)間執(zhí)行的任務(wù),通常以異步任務(wù)的方式執(zhí)行,這時(shí)就需要用到像 celery 類的框架。另一種常見(jiàn)的場(chǎng)景是大型系統(tǒng)的分布式處理,為了提升系統(tǒng)性能,各個(gè)組件通常以多個(gè)實(shí)例運(yùn)行不同主機(jī)上,而組件之間的調(diào)用就需要用到 celery 這樣的框架。使用 celery (或消息隊(duì)列),有助于降低系統(tǒng)組件之間的耦合,有助于實(shí)現(xiàn)灰度發(fā)布、實(shí)現(xiàn)服務(wù)的分布式、實(shí)現(xiàn)水平擴(kuò)展,最終提升系統(tǒng)健壯性和處理性能。
celery (和類似框架)的核心是任務(wù)隊(duì)列。用戶發(fā)起任務(wù),celery 負(fù)責(zé)把任務(wù)排隊(duì)和整理,然后交到任務(wù)執(zhí)行器 worker 中。 worker 監(jiān)視任務(wù)隊(duì)列,獲取新任務(wù)并執(zhí)行。在 celery 內(nèi)部,以消息機(jī)制協(xié)調(diào)各個(gè)組件工作,消息需要借助一個(gè)中間人 broker 進(jìn)行,如下 ::
client → celery task → broker → celery worker↑ ↓← ← ← ← result backendclient 發(fā)起任務(wù)時(shí),一般是以異步方式(除非必要的同步 rpc ),獲得一個(gè)任務(wù)的 id 并保存下來(lái),后續(xù)可通過(guò) id 到 result backend 中查詢?nèi)蝿?wù)執(zhí)行結(jié)果。broker 是第三方組件,可使用消息隊(duì)列( rabbitmq 等)、redis、數(shù)據(jù)庫(kù)等,只要能實(shí)現(xiàn)消息的存儲(chǔ)和分發(fā)理論上都能使用。 worker 以線程或進(jìn)程的形式運(yùn)行,從 broker 中取任務(wù)執(zhí)行,然后把結(jié)果保存到 result backend 。
目前 rabbitmq 的 broker 實(shí)現(xiàn)的功能最完備,在開(kāi)發(fā)環(huán)境中也可以使用 sqlite 等比較方便的方式,但性能會(huì)很差,不能用在生產(chǎn)環(huán)境上。
另外需要注意的是,由于不同操作系統(tǒng)的進(jìn)程模型的差異,celery 會(huì)在 windows 上產(chǎn)生一些配置方面的怪異問(wèn)題。
celery 可直接通過(guò) pip 安裝,在 virtualenv 下,直接運(yùn)行 ::
pip install celery再安裝 broker 所需要的驅(qū)動(dòng),例如使用 rabbitmq ,則安裝 ::
pip install amqp同時(shí)安裝好 rabbitmq (建議通過(guò) docker 安裝,使用 rabbitmq:management 鏡像,可在 15672 端口查看管理控制臺(tái))。
然后使用下面的代碼示例(摘錄來(lái)自: Ask Solem. “Celery Manual, Version 3.1“) ::
# hello.py from celery import Celeryapp = Celery('hello', broker='amqp://guest:guest@localhost//')@app.task def hello():return 'hello world'if __name__ == '__main__':r = hello.delay()然后,啟動(dòng) worker ::
celery -A hello worker --loglevel=infoclient 執(zhí)行任務(wù) ::
python hello.pyapp.task 裝飾器標(biāo)記一個(gè)函數(shù)為 celery 任務(wù),client 用 delay 方法執(zhí)行時(shí)。 delay 調(diào)用 apply_async() 進(jìn)行異步執(zhí)行, apply_async 還可配置如隊(duì)列、countdown 等執(zhí)行選項(xiàng)。 celery 返回一個(gè) AsyncResult 對(duì)象,如果 result backend 配置正確,client 可暫時(shí)把對(duì)象中的任務(wù) id 保存到數(shù)據(jù)庫(kù),后面再通過(guò)這個(gè) id 獲取異步執(zhí)行的結(jié)果。
上面的簡(jiǎn)單例子是沒(méi)有參數(shù)的,如果增加參數(shù),如下 ::
# add.py from celery import Celeryapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task def add(x, y):return x+yif __name__ == '__main__':r = add.delay(1, 2)print(r.wait())啟動(dòng) worker ::
celery -A add worker --l info調(diào)用 ::
python add.py當(dāng)任務(wù)結(jié)果用 amqp 保存時(shí),結(jié)果只能取一次, 因此無(wú)法在后續(xù)調(diào)用中查詢?nèi)蝿?wù)結(jié)果。這個(gè)例子用 sqlite 保存了任務(wù)執(zhí)行結(jié)果,因此 client 可在 r.wait() 查詢?nèi)蝿?wù)的結(jié)果、任務(wù)的狀態(tài)等等很多信息,可把 r.id 保存到數(shù)據(jù)庫(kù),然后未來(lái)查詢?nèi)蝿?wù)的 AsyncResult ::
r2 = app.AsyncResult(r.id) print(r2.wait()) print(r2.successful())add.py 中使用了兩個(gè)參數(shù) x y ,而 celery 需要通過(guò) broker 傳遞這兩個(gè)參數(shù),這時(shí)需要對(duì)數(shù)據(jù)進(jìn)行序列化,將 x y 對(duì)象轉(zhuǎn)換為無(wú)結(jié)構(gòu)的數(shù)據(jù),然后 worker 接收到后再把數(shù)據(jù)還原為 x y 對(duì)象。 celery 內(nèi)置的序列化方法包括 pickle 、 json 等等,如果對(duì)象比較復(fù)雜,需要自己定義序列化方法。
如果不想立即執(zhí)行任務(wù),而是把任務(wù)傳遞到其它地方,通過(guò) celery 的 subtask 支持。 subtask 是對(duì) task 的調(diào)用參數(shù)和執(zhí)行選項(xiàng)的一個(gè)封裝,如 ::
add.subtask((2,2), countdown=10) add.s(2,2)subtask 或 s 返回的是一個(gè) task 的簽名(celery.canvas.Signature),它可實(shí)現(xiàn)工作流、偏函數(shù)等效果。subtask 支持和 task 同樣的調(diào)用方法,如 ::
s = add.s(2) # subtask ,partial s.delay(2) # 發(fā)送消息開(kāi)始異步執(zhí)行在 celery 工作流中組織 subtask 的方式有 group / chain / chord 等等, group 中任務(wù)并發(fā)執(zhí)行,chain 中任務(wù)順序執(zhí)行,chord 中進(jìn)行回調(diào)。而這些組織方式本身也是 subtask ,可嵌套使用 ::
# workflow.py from celery import Celery, group, chainapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task def add(x, y):return x+yif __name__ == '__main__':g = group((add.s(i, i) for i in range(10)))r = g.delay()print(r.get())c = chain(add.s(1, 2) | add.s(3))r2 = c.delay()print(r2.get())celery 的任務(wù)調(diào)用通過(guò)網(wǎng)絡(luò)發(fā)送任務(wù)的名字和參數(shù),不發(fā)送任務(wù)代碼, worker 收到任務(wù)后根據(jù)任務(wù)名和參數(shù)執(zhí)行相應(yīng)的代碼。因此不同 worker 中的代碼版本不一樣時(shí),會(huì)有不同的處理結(jié)果。如果 worker 中不能處理相應(yīng)的任務(wù)名,就會(huì)報(bào)錯(cuò)。
轉(zhuǎn)載于:https://www.cnblogs.com/fengyc/p/5655287.html
總結(jié)
以上是生活随笔為你收集整理的celery 学习笔记 01-介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 正在或即将被使用的Go依赖包管理方法:G
- 下一篇: HTML5实现屏幕手势解锁(转载)