日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

cerely异步分布式

發(fā)布時(shí)間:2023/12/18 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 cerely异步分布式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、釋義:

  Celery 是一個(gè) 基于python開發(fā)的分布式異步消息任務(wù)隊(duì)列,通過它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理, 如果你的業(yè)務(wù)場景中需要用到異步任務(wù),就可以考慮使用celery。

舉幾個(gè)實(shí)例場景中可用的例子:

  • 你想對100臺機(jī)器執(zhí)行一條批量命令,可能會花很長時(shí)間 ,但你不想讓你的程序等著結(jié)果返回,而是給你返回 一個(gè)任務(wù)ID,你過一段時(shí)間只需要拿著這個(gè)任務(wù)id就可以拿到任務(wù)執(zhí)行結(jié)果, 在任務(wù)執(zhí)行ing進(jìn)行時(shí),你可以繼續(xù)做其它的事情。?
  • 你想做一個(gè)定時(shí)任務(wù),比如每天檢測一下你們所有客戶的資料,如果發(fā)現(xiàn)今天 是客戶的生日,就給他發(fā)個(gè)短信祝福

Celery 本身并不提供消息服務(wù),在執(zhí)行任務(wù)時(shí)需要通過一個(gè)消息中間件來接收和發(fā)送任務(wù)消息,以及存儲任務(wù)結(jié)果, 一般使用rabbitMQ or Redis

2、Celery的優(yōu)點(diǎn):

  • 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
  • 高可用:當(dāng)任務(wù)執(zhí)行失敗或執(zhí)行過程中發(fā)生連接中斷,celery 會自動(dòng)嘗試重新執(zhí)行任務(wù)
  • 快速:一個(gè)單進(jìn)程的celery每分鐘可處理上百萬個(gè)任務(wù)
  • 靈活: 幾乎celery的各個(gè)組件都可以被擴(kuò)展及自定制

3、Celery基本工作流程圖

4、示例

這里我們使用redis
連接url的格式為:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = 'redis://localhost:6379/0'

安裝celery和redis

  • pip install celery
  • pip install redis

使用celery包含三個(gè)方面:

  • 定義任務(wù)函數(shù)
  • 運(yùn)行celery服務(wù)
  • 客戶應(yīng)用程序的調(diào)用

先創(chuàng)建一個(gè)腳本 tasks.py

from celery import Celery #導(dǎo)入了celery

broker = 'redis://172.16.94.85:6379/1'
backend = 'redis://172.16.94.85:6379/2'
app = Celery('tasks', broker=broker, backend=backend) #創(chuàng)建了celery實(shí)例app,實(shí)例化的過程中指定任務(wù)名tasks(和文件名一致),傳入了broker和backend

@app.task #裝飾器
def add(x, y): #創(chuàng)建任務(wù)函數(shù)add
print("running...", x, y)
return x + y

在當(dāng)前命令行終端運(yùn)行(啟動(dòng)worker,worker名要和腳本名一致):

celery -A tasks worker --loglevel=info

此時(shí)會看見一對輸出,包括注冊的任務(wù)

新建 test.py并執(zhí)行:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2018/5/26 8:17
# @Author : JWQ
# @File : demo1.py

from tasks import add #導(dǎo)入tasks模塊

re = add.delay(10, 20)
print(re.result) #獲取結(jié)果
print(re.ready) #是否處理
print(re.get(timeout=1)) #獲取結(jié)果
print(re.status) #是否處理

?執(zhí)行test.py后在celery行能看到相關(guān)的操作日志:

[2018-05-25 11:31:28,373: WARNING/ForkPoolWorker-1] ('running...', 4, 4)
[2018-05-25 11:31:28,394: INFO/ForkPoolWorker-1] Task tasks.add[30b145f9-14f7-4cd8-aa5e-7b6105c52325] succeeded in 0.0216804221272s: 8
[2018-05-25 11:31:58,991: INFO/MainProcess] Received task: tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762] ?
[2018-05-25 11:31:58,995: WARNING/ForkPoolWorker-1] ('running...', 4, 4)
[2018-05-25 11:31:58,998: INFO/ForkPoolWorker-1] Task tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762] succeeded in 0.00274921953678s: 8

打開 backend的redis,也可以看見celery執(zhí)行的信息。

在python環(huán)境中調(diào)用的add函數(shù),實(shí)際上是在應(yīng)用程序中調(diào)用這個(gè)方法。需要注意,如果把返回值賦值給一個(gè)變量,那么原來的應(yīng)用程序也會被阻塞,需要等待異步任務(wù)返回的結(jié)果。因此,實(shí)際使用中,不需要把結(jié)果賦值。

5、Celery模塊調(diào)用

既然celery是一個(gè)分布式的任務(wù)調(diào)度模塊,那么celery是如何和分布式掛鉤呢,celery可以支持多臺不通的計(jì)算機(jī)執(zhí)行不同的任務(wù)或者相同的任務(wù)。

如果要說celery的分布式應(yīng)用的話,我覺得就要提到celery的消息路由機(jī)制,就要提一下AMQP協(xié)議。具體的可以查看AMQP的文檔。簡單地說就是可以有多個(gè)消息隊(duì)列(Message Queue),不同的消息可以指定發(fā)送給不同的Message Queue,而這是通過Exchange來實(shí)現(xiàn)的。發(fā)送消息到Message Queue中時(shí),可以指定routiing_key,Exchange通過routing_key來把消息路由(routes)到不同的Message Queue中去,如圖:

6、多worker,多隊(duì)列

先寫腳本task.py

[root@localhost celery]# cat tasks.py #!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def taskA(x,y):
return x + y

@app.task
def taskB(x,y,z):
return x + y + z
上面的代碼中,首先定義了一個(gè)Celery的對象,然后通過celeryconfig.py對celery對象進(jìn)行設(shè)置。之后又分別定義了三個(gè)task,分別是taskA, taskB和add。 接下來寫celeryconfig.py,需要注意代碼的縮進(jìn)格式: [root@localhost celery]# cat celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.48.131:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}
配置文件一般單獨(dú)寫在一個(gè)文件中 啟動(dòng)一個(gè)worker來指定taskA celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B 腳本測試: from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2, 3)
print(re3.status)???? #PENDING
我們看到add的狀態(tài)是PENDING,表示沒有執(zhí)行,這個(gè)是因?yàn)闆]有celeryconfig.py文件中指定改route到哪一個(gè)Queue中,所以會被發(fā)動(dòng)到默認(rèn)的名字celery的Queue中,但是我們還沒有啟動(dòng)worker執(zhí)行celery中的任務(wù)。下面,我們來啟動(dòng)一個(gè)worker來執(zhí)行celery隊(duì)列中的任務(wù)。 celery -A tasks worker -l info -n worker.%h -Q celery print(re3.status)??? #SUCCESS 7、Celery與定時(shí)任務(wù) 在celery中執(zhí)行定時(shí)任務(wù)非常簡單,只需要設(shè)置celery對象中的CELERYBEAT_SCHEDULE屬性即可。 下面我們接著在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
??? 'taskA_schedule' : {
??????? 'task':'tasks.taskA',
??????? 'schedule':20,
??????? 'args':(5,6)
??? },
??? 'taskB_scheduler' : {
??????? 'task':"tasks.taskB",
??????? "schedule":200,
??????? "args":(10,20,30)
??? },
??? 'add_schedule': {
??????? "task":"tasks.add",
??????? "schedule":10,
??????? "args":(1,2)
??? }
注意格式,否則會有問題
Celery啟動(dòng)定時(shí)任務(wù): celery –A tasks beat

Celery啟動(dòng)定時(shí)任務(wù):

這樣taskA每20秒執(zhí)行一次taskA.delay(5, 6)
taskB每200秒執(zhí)行一次taskB.delay(10, 20, 30)
Celery每10秒執(zhí)行一次add.delay(1, 2)

轉(zhuǎn)載于:https://www.cnblogs.com/Jweiqing/p/9096427.html

總結(jié)

以上是生活随笔為你收集整理的cerely异步分布式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。