日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flask + celery实现定时任务和异步

發(fā)布時(shí)間:2024/2/28 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flask + celery实现定时任务和异步 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

參考資料:?

Celery 官網(wǎng):http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/

Celery簡(jiǎn)介
除Celery是一個(gè)異步任務(wù)的調(diào)度工具。 Celery 是 Distributed Task Queue,分布式任務(wù)隊(duì)列,分布式?jīng)Q定了可以有多個(gè) worker 的存在,隊(duì)列表示其是異步操作,即存在一個(gè)產(chǎn)生任務(wù)提出需求的工頭,和一群等著被分配工作的碼農(nóng)。
Broker
在 Python 中定義 Celery 的時(shí)候,我們要引入 Broker(消息中間件),中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個(gè)中間人的角色。在工頭提出任務(wù)的時(shí)候,把所有的任務(wù)放到 Broker 里面,在 Broker 的另外一頭,一群碼農(nóng)等著取出一個(gè)個(gè)任務(wù)準(zhǔn)備著手做。
Backend
這種模式注定了整個(gè)系統(tǒng)會(huì)是個(gè)開環(huán)系統(tǒng),工頭對(duì)于碼農(nóng)們把任務(wù)做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務(wù)的結(jié)果。這個(gè) Backend 有點(diǎn)像我們的 Broker,也是存儲(chǔ)任務(wù)的信息用的,只不過這里存的是那些任務(wù)的返回結(jié)果。我們可以選擇只讓錯(cuò)誤執(zhí)行的任務(wù)返回結(jié)果到 Backend,這樣我們?nèi)』亟Y(jié)果,便可以知道有多少任務(wù)執(zhí)行失敗了。
Celery應(yīng)用場(chǎng)景
1.你想對(duì)100臺(tái)機(jī)器執(zhí)行一條批量命令,可能會(huì)花很長(zhǎng)時(shí)間 ,但你不想讓你的程序等著結(jié)果返回,而是給你返回 一個(gè)任務(wù)ID,你過一段時(shí)間只需要拿著這個(gè)任務(wù)id就可以拿到任務(wù)執(zhí)行結(jié)果, 在任務(wù)執(zhí)行ing進(jìn)行時(shí),你可以繼續(xù)做其它的事情。
2.你想做一個(gè)定時(shí)任務(wù),比如每天檢測(cè)一下你們所有客戶的資料,如果發(fā)現(xiàn)今天 是客戶的生日,就給他發(fā)個(gè)短信祝福

Celery的特點(diǎn)
1.簡(jiǎn)單:一單熟悉了celery的工作流程后,配置和使用還是比較簡(jiǎn)單的
2.高可用:當(dāng)任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery 會(huì)自動(dòng)嘗試重新執(zhí)行任務(wù)
3.快速:一個(gè)單進(jìn)程的celery每分鐘可處理上百萬個(gè)任務(wù)
3.靈活: 幾乎celery的各個(gè)組件都可以被擴(kuò)展及自定制

Celery工作基本流程

?

我們的項(xiàng)目

項(xiàng)目目錄:

proj/celery.py

from __future__ import absolute_import, unicode_literals from celery import Celeryapp = Celery('proj',broker = 'amqp://',backend = 'amqp://',include = ['proj.tasks'])app.conf.update(result_expires = 3600 )if __name__ == '__main__':app.start()

在這個(gè)模塊中創(chuàng)建了Celery實(shí)例(通常稱為app)

要在項(xiàng)目中使用Celery只需要通過import導(dǎo)入該實(shí)例就行了

  • broker參數(shù)指定要使用的中間件的URL
  • backend參數(shù)指定使用的result backend

    用來跟蹤任務(wù)狀態(tài)和結(jié)果,雖然默認(rèn)狀態(tài)下結(jié)果不可用。以上例子中使用RPC result backend。當(dāng)然,不同的result backend都有自己的好處和壞處,根據(jù)自己實(shí)際情況進(jìn)行選擇,如果不需要最好禁用。通過設(shè)置@task(ignore_result=True)選項(xiàng)來禁用耽擱任務(wù))

  • include參數(shù)是當(dāng)worker啟動(dòng)時(shí)導(dǎo)入的模塊列表需要在這里添加自己的任務(wù)莫夸這樣worker就可以找到任務(wù)


proj/tasks.py

  

from __future__ import absolute_import, unicode_literals from .celery import app@app.task def add(x, y):return x + y@app.task def mul(x, y):return x * y@app.task def xsum(numbers):return sum(numbers)

?

啟動(dòng)worker

Celery程序可以用來啟動(dòng)worker:

celery -A proj worker -l info -------------- celery@centos6 v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-2.6.32-696.el6.x86_64-x86_64-with-centos-6.9-Final 2018-03-26 12:27:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: task:0x7fe5cfbd20d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues].> celery exchange=celery(direct) key=celery[tasks][2018-03-26 12:27:49,921: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2018-03-26 12:27:49,926: INFO/MainProcess] mingle: searching for neighbors [2018-03-26 12:27:49,499: INFO/MainProcess] mingle: sync with 1 nodes [2018-03-26 12:27:50,950: INFO/MainProcess] mingle: sync complete [2018-03-26 12:27:50,957: INFO/MainProcess] celery@centos6 ready.
  • broker是在celery模塊中指定的中間件參數(shù)的url,也可以在命令行中通過-b選項(xiàng)指定不同的中間件
  • Concurrent是用于并行處理的任務(wù)的預(yù)創(chuàng)建worker進(jìn)程數(shù)量,當(dāng)所有的任務(wù)都在忙于工作時(shí),新的任務(wù)必須等待之前的執(zhí)行完成才能處理

默認(rèn)的并發(fā)數(shù)是機(jī)器上CPU的數(shù)量,可以通過celery worker -c選項(xiàng)指定自定義數(shù)量。沒有推薦值,最佳數(shù)量取決于很多因素,但是如果你的任務(wù)主要是I/O相關(guān)的,就可以增加這個(gè)數(shù)量。實(shí)驗(yàn)表明,增加超過兩倍CPU數(shù)量效果很差,而且可能會(huì)降低性能

除了prefork pool,Celery還支持Eventlet、Gevent并且還能在單線程上運(yùn)行

  • Event是一個(gè)可選項(xiàng),當(dāng)啟用的時(shí)候,Celery會(huì)發(fā)送監(jiān)控(消息)來反映worker的操作,也可以被用來監(jiān)視像celery、events和Flower(實(shí)時(shí)Celery監(jiān)控)這樣的程序。
  • Queues是worker將使用的任務(wù)的隊(duì)列的集合,worker可以一次接受幾個(gè)隊(duì)列,它用來將消息路由到特定的工作者以作為服務(wù)質(zhì)量、關(guān)注點(diǎn)分離、和優(yōu)化的一種方式

可以通過命令行獲取完整的列表————celery worker --help

停止worker

ctrl-c

后臺(tái)

生產(chǎn)環(huán)境中一般將worker放到后臺(tái),后臺(tái)腳本使用celery multi命令后臺(tái)啟動(dòng)一個(gè)或多個(gè)worker

celery multi start w1 -A proj -l info

控制臺(tái)打印

celery multi v4.1.0 (latentcall) > Starting nodes...> w1@centos6: OK Stale pidfile exists - Removing it.

也可以重啟:

celery multi restart w1 -A proj -l info celery multi v4.1.0 (latentcall) > Stopping nodes...> w1@centos6: TERM -> 23620 > Waiting for 1 node -> 23620.....> w1@centos6: OK > Restarting node w1@centos6: OK > Waiting for 1 node -> None...

停止:

celery multi stop w1 -A proj -l info

stop命令是異步的所以它不會(huì)等待worker關(guān)閉,可以使用stopwait命令來確保當(dāng)前執(zhí)行都任務(wù)在退出前都已執(zhí)行完畢

celery multi stopwait w1 -A proj -l info

celery multi不會(huì)存儲(chǔ)關(guān)于worker的信息,所以重啟的時(shí)候需要使用同樣的命令行參數(shù)。在停止時(shí),必須使用相同的pidfile和logfile參數(shù)

默認(rèn)情況下,程序?qū)⒃诋?dāng)期目錄創(chuàng)建pid和log文件,為了防止多個(gè)worker運(yùn)行出錯(cuò),推薦將這些文件放在專門的目錄:

mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

使用multi指令可以啟動(dòng)多個(gè)worker,并且有一個(gè)強(qiáng)大的命令行語法來為不同的worker指定參數(shù):

celery multi start 10 -A proj -l info -Q:1-3 images, video -Q:4, 5 data -Q default -L:4,5 debug

~Detail about?multi?temp

--app參數(shù)

--app參數(shù)指定使用的Celery應(yīng)用實(shí)例,必須以module.path:attribute的形式出現(xiàn)

但也支持快捷方式,只要包名指定了,就會(huì)嘗試在應(yīng)用實(shí)例中搜索

使用--app=proj:

  • 名為proj.app的屬性
  • 名為proj.app的屬性
  • 模塊proj中的任何屬性都是一個(gè)Celery應(yīng)用程序,如果都沒有發(fā)現(xiàn),它就會(huì)嘗試一個(gè)名為proj.celery的子模塊
  • 名為proj.celery.app的屬性
  • 名為proj.celery.celery的屬性
  • 模塊proj.celery中的任何屬性都是一個(gè)Celery應(yīng)用程序
  • 任務(wù)調(diào)用

    • 可以通過使用delay()方法來調(diào)用一個(gè)任務(wù)
    add.delay(3, 3)

    這個(gè)方法實(shí)際上是另一種叫做apply_async()方法的快捷方式

    add.applay_async((3, 3))

    后者(applay_async())能夠指定執(zhí)行選項(xiàng),比如運(yùn)行時(shí)間(倒計(jì)時(shí))、應(yīng)該發(fā)送的隊(duì)列等等:

    add.apply_async((2, 2), queue='lopri', countdown=10)

    上述案例中,任務(wù)會(huì)被發(fā)送給一個(gè)名為lopri的隊(duì)列,該任務(wù)會(huì)在信息發(fā)送后十秒執(zhí)行

    直接應(yīng)用該任務(wù)會(huì)在當(dāng)前進(jìn)程中執(zhí)行任務(wù),不會(huì)發(fā)送消息

    add(3, 3)

    result:6

    三種方法delay()、apply_async()和應(yīng)用__call__,代表了Celery調(diào)用API,也同樣用于簽名

    • 每一個(gè)任務(wù)調(diào)用都有一個(gè)唯一的標(biāo)識(shí)符(UUID),這個(gè)就是任務(wù)的id

    • delay()和apply_async方法會(huì)返回一個(gè)AsyncResult實(shí)例,可以被用來跟蹤任務(wù)執(zhí)行狀態(tài),但是需要開啟result backend這樣狀態(tài)才能被存儲(chǔ)在某處

    • Results默認(rèn)是禁用的,因?yàn)閷?shí)際上沒有一個(gè)result backend適用于每個(gè)應(yīng)用程序,所以要考慮到每個(gè)獨(dú)立backend的缺點(diǎn)來選擇一個(gè)使用。對(duì)于許多保持返回值的任務(wù)來說都不是很有用,所以這個(gè)默認(rèn)的禁用是很明智的。還需要注意的是,result backend并不用來監(jiān)控任務(wù)和worker,對(duì)于Celery有專門的事件消息

    如果配置了result backend就可以接收到任務(wù)的返回值

    result = add.delay(2, 2) res.get(timeout=1)

    retult:4

    • 可以通過查看id屬性找到任務(wù)的id
    res.id

    result:073c568d-ca88-4198-b735-0f98f861218b

    • 如果任務(wù)拋出異常也可以檢查到異常,默認(rèn)result.get()可以傳播任何錯(cuò)誤

    • 如果不希望錯(cuò)誤傳播,可以通過propagete屬性禁用

    res.get(propagate=False)

    在這種情況下,它會(huì)返回所提出的異常實(shí)例,以便檢查任務(wù)是否成功或失敗,您將不得不在結(jié)果實(shí)例上使用相應(yīng)的方法

    res.failed() res.successful()

    也可以通過state找到任務(wù)的狀態(tài)

    res.state

    result:FAILUTE

    • 一個(gè)任務(wù)只能有一個(gè)狀態(tài),但是可以在幾個(gè)狀態(tài)中發(fā)展,典型任務(wù)階段可能是這樣
    PENDING -> STARTED -> SUCCESS

    STARTED狀態(tài)是一個(gè)特殊的狀態(tài),只有在task_track_started設(shè)置啟用或者@task(track_started=True)選項(xiàng)設(shè)置的時(shí)候才會(huì)被記錄下來

    PENDING狀態(tài)實(shí)際上不是記錄狀態(tài),而是未知任務(wù)id的默認(rèn)狀態(tài)

    from proj.celery import app res = app.AsyncResult('this-id-does-not-exist') res.state

    result:PENDING

    • 如果重新嘗試這個(gè)任務(wù)可能會(huì)變得更復(fù)雜,對(duì)于一個(gè)嘗試過兩遍的任務(wù)來說階段可能是這樣:
    PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

    Canvas:設(shè)計(jì)任務(wù)流

    前面學(xué)習(xí)了通過delay方法調(diào)用任務(wù),通常這樣就夠了,但是有時(shí)可能需要將任務(wù)調(diào)用的簽名傳遞給另一個(gè)進(jìn)程或者另一個(gè)函數(shù)的參數(shù),對(duì)Celery來說叫做signatures

    簽名以某種方式包裝了單一任務(wù)調(diào)用的參數(shù)和執(zhí)行選項(xiàng),以便將其傳遞給函數(shù),甚至序列化后發(fā)送。

    可以使用參數(shù)(2, 2)和十秒的計(jì)時(shí)器來為add任務(wù)創(chuàng)建一個(gè)簽名

    add.signature((2, 2), countdown=10)

    也可以簡(jiǎn)寫:

    add.s(2, 2)

    調(diào)用API

    簽名的實(shí)例也支持調(diào)用API,意味著也可以有delay和apply_async方法

    但是有一個(gè)區(qū)別,那就是簽名可能已經(jīng)指定了一個(gè)參數(shù)簽名,add任務(wù)接受兩個(gè)參數(shù),所以一個(gè)制定了兩個(gè)參數(shù)的簽名將會(huì)形成一個(gè)完整的簽名

    s1 = add.s(2, 2) res = s1.delay() res.get()

    也可以使用不完成的簽名,叫做partials:

    s1 = add.s(2)

    s2現(xiàn)在是部分簽名,需要另一個(gè)參數(shù)才完整,則可以在調(diào)用signature的時(shí)候處理

    # resolves the partial: add(8, 2) res = s2.delay(8) res.get()

    在這里,添加了參數(shù)8,對(duì)已存在的參數(shù)2組成了一個(gè)完整的簽名add(8, 2)

    關(guān)鍵字參數(shù)也可以延遲添加,會(huì)和已存在的關(guān)鍵字參數(shù)合并,新參數(shù)優(yōu)先(新參數(shù)覆蓋舊參數(shù))

    s3 = add.s(2, 2, debug=True) s3.delay(debug=False)

    已聲明的簽名支持調(diào)用API:

    • sig.apply_async(arg=(), kwargs={}, **options
      使用可選部分參數(shù)和部分關(guān)鍵字參數(shù)調(diào)用簽名,也支持部分可執(zhí)行選項(xiàng)
    • sig.delay(*args, **kwargs)
      apply_async的星參版本,任何參數(shù)都會(huì)被預(yù)先記錄在簽名的參數(shù)你,關(guān)鍵字參數(shù)會(huì)和現(xiàn)有的keys合并

    基本體

    • group
    • chain
    • chord
    • map
    • starmap
    • chunks

    這些基本體本身就是簽名對(duì)象,因此,它們可以以任何多種方式組合起來組成復(fù)雜的工作流

    Group

    一個(gè)group同時(shí)調(diào)用任務(wù)列表,返回一個(gè)特殊結(jié)果實(shí)例,這樣可以以組的形式檢查結(jié)果,并按順序檢索返回值

    from celery import group from proj.tasks import addgroup(add.s(i, i) for in in range(10))().get()

    result:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

    • Partial group
    g = group(add.s(i, i) for i in range(10)) g(10).get()

    result:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

    Chains

    任務(wù)可以被相互連接起來,這樣在一個(gè)任務(wù)返回后另一個(gè)任務(wù)被調(diào)用

    from celery import chain form proj.tasks import add, mul// 用法1 chian(add.s(4, 4) | mul.s(8))().get()// 用法2 g = chain(add.s(4) | mul.s(8)) g(4).get()// 用法3 (add.s(4, 4) | mul.s(8))().get()

    Chords

    chord是一個(gè)有返回值的group

    from celery import chord from proj.tasks import add, xsum// 用法1 (group(add.s(i, i) for i in range(10)) | xsum.s())().get()// 用法2 upload_document.s(file) | group(apply_filter.s() for filter in filters)

    路由

    Celery支持AMQP提供的所有路由設(shè)施,但是它也支持簡(jiǎn)單路由,將消息發(fā)送到指定的隊(duì)列

    task_routes設(shè)置可以是用戶按名稱對(duì)任務(wù)進(jìn)行路由,并將一切集中在一個(gè)位置

    app.conf.update{task_routes = {'proj.tasks.add': {'queue': 'hipri'},} }

    可以在運(yùn)行時(shí)通過queue參數(shù)指定隊(duì)列到apply_async:

    from proj.tasks import add add.apply_async((2,2), queue='hipri')

    然后可以通過指定celery worker -Q選項(xiàng)使worker從隊(duì)列中消費(fèi)

    celery -A proj worker -Q hipri

    也可以通過使用逗號(hào)分隔符(,)來指定多個(gè)隊(duì)列

    celery -A proj worker -Q hipri, celery

    默認(rèn)隊(duì)列因?yàn)闅v史原因命名為:celery

    隊(duì)列的順序無關(guān)緊要,因?yàn)閣orker會(huì)給隊(duì)列相同的權(quán)重

    遠(yuǎn)程控制

    如果使用RabbitMQ(AMQP)、Redis或者Qpid作為中間件就可以在運(yùn)行時(shí)監(jiān)視worker

    • 查看worker當(dāng)前執(zhí)行的任務(wù)
    celery -A proj inspect active

    這是通過使用廣播消息實(shí)現(xiàn)的,因此,急群眾的每一個(gè)工作人員都能接收到所有遠(yuǎn)程控制命令

    • 也可以指令一個(gè)或多個(gè)worker使用--destination選項(xiàng)請(qǐng)求行動(dòng),這是一個(gè)逗號(hào)分隔的worker主機(jī)名列表
    celery -A proj inspect active --destination=celery@example.com

    如果沒有提供目標(biāo),那么每個(gè)worker都會(huì)對(duì)請(qǐng)求做出反應(yīng)并回復(fù)

    • celery inspect命令包含的命令不會(huì)改變worker的任何東西,它只會(huì)回復(fù)關(guān)于worker內(nèi)部發(fā)生的事情的信息和統(tǒng)計(jì)信息,可以執(zhí)行命令檢查列表:
    celery -A proj inspect --help
    • celery control命令,包含在運(yùn)行時(shí)實(shí)際改變worker操作的命令
    celery -A proj control --help
    • 強(qiáng)制worker啟用事件消息(用于監(jiān)視任務(wù)和工作人員)
    celery -A proj control enable_events

    當(dāng)事件激活,可以啟動(dòng)event dumper查看worker正在做什么

    celery -A proj events --dump

    或者

    celery -A proj events

    當(dāng)完成監(jiān)控可以再次禁用events

    celery -A proj control disable_events

    celery status命令還能使用遠(yuǎn)程控制命令,并顯示集群中的在線worker列表

    celery -A proj status

    時(shí)區(qū)

    所有的時(shí)間和日期、內(nèi)部和消息多使用UTC時(shí)間區(qū)域

    當(dāng)worker收到消息,例如使用倒計(jì)時(shí)設(shè)置,它將UTC時(shí)間轉(zhuǎn)換為本地時(shí)間。如果希望使用與系統(tǒng)時(shí)區(qū)不同的地區(qū),那么必須要使用時(shí)區(qū)設(shè)置來配置該時(shí)區(qū):

    app.conf.timezone = 'Asia/Shanghai'

    ?

    最優(yōu)化

    默認(rèn)的配置并沒有針對(duì)吞吐量進(jìn)行優(yōu)化,它試圖在許多短任務(wù)和更少的長(zhǎng)任務(wù)之間走中間路線,這是吞吐量和公平調(diào)度之間的折中

    ?

    總結(jié)

    以上是生活随笔為你收集整理的flask + celery实现定时任务和异步的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。