日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery

發布時間:2023/12/18 编程问答 30 豆豆

文章目錄

      • 1、定義
      • 2、Celery異步任務框架特點
      • 3、Celery架構
      • 4、使用場景
      • 5、Celery的安裝配置
      • 6、基本使用
      • 7、celery多任務結構
      • 8、延時任務
        • 8.1、方式一
        • 8.2、方式二
      • 9、定時任務
      • 10、django中使用celery()

1、定義

python中的一個分布式異步任務框架 Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統 專注于實時處理的異步任務隊列 同時也支持任務調度 (1) 執行異步任務(對立: 同步任務),解決耗時任務,將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等 (2) 執行延時任務(比如5分鐘后干一件事): 解決延遲任務 (3) 執行定時任務: 每天隔幾分鐘干什么事,解決周期(周期)任務,比如每天數據統計Celery 官網: http://www.celeryproject.org/ Celery 官方文檔英文版: http://docs.celeryproject.org/en/latest/index.html Celery 官方文檔中文版: http://docs.jinkan.org/docs/celery/

2、Celery異步任務框架特點

(1) 可以不依賴任何服務器,通過自身命令,啟動服務(內部支持socket) (2) celery服務為為其他項目服務提供異步解決任務需求的注: 會有兩個服務同時運行,一個是項目服務,一個是celery服務,項目服務將需要異步處理的任務交給celery服務,celery就會在需要時異步完成項目的需求 人是一個獨立運行的服務 | 醫院也是一個獨立運行的服務 正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與,但當人生病時,就會被醫院接收,解決人生病問題 人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立運行,人生病時,醫院就來解決人生病的需求

3、Celery架構

Celery的架構由三部分組成,消息中間件(message broker)、任務執行單元(worker)和任務執行結果存儲(task result store)組成

1、消息中間件 Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括RabbitMQ、Redis等2、任務執行單元 Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中3、任務結果存儲 Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

4、使用場景

異步執行: 解決耗時任務,將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等 延遲執行: 解決延遲任務 定時執行: 解決周期(周期)任務,比如每天數據統計關于秒殺系統可以使用celery不能秒超,使用鎖機制(mysql悲觀鎖,樂觀鎖),redis鎖提高并發量 ---> 把同步做成異步 ---> 使用celery前端點擊秒殺按鈕,向后端發送秒殺請求 ---> 同步操作同步操作請求來到后端,判斷數量是否夠,如果夠要生成訂單(mysql),訂單狀態是待支付狀態 請求返回,告訴前端秒殺成功異步操作請求來到后端,提交一個celery任務 ---> celery任務異步的執行判斷數量是否夠,如果夠,要生成訂單(mysql)秒殺是否成功的結果還沒有,直接返回了(返回任務id)前端啟動一個定時任務,每隔5s,向后臺發送一個查詢請求,查詢秒殺任務是否執行完成(帶著任務id)如果是未執行狀態或者執行中 ---> 返回給前端,前端不處理,定時任務繼續執行又隔了5s,發送查詢,查詢到秒殺成功的結果,返回給前端,秒殺成功

5、Celery的安裝配置

pip install celery 消息中間件: RabbitMQ/Redis app=Celery(‘任務名’, broker=’xxx’, backend=’xxx’)

6、基本使用

1、定義一個py文件(t_celery.py) import celery # 消息中間件(redis) broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 這個db # 結果存儲(redis) backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 這個db# 實例化得到對象,指定中間件和結果存儲 app=celery.Celery('test',broker=broker,backend=backend)# 定義任務(可以有多個) @app.task def add(a,b):return a+b@app.task def mul(a,b):return a*b2、提交任務(在其它文件中,task.py) from t_celery import add res=add.delay(100,4) print(res) # 任務id號3、啟動worker 非windows平臺: celery worker -A t_celery -l info windows需要裝eventlet模塊: celery worker -A t_celery -l info -P eventlet4、查看執行結果 from t_celery import app from celery.result import AsyncResult # 關鍵字,變量不能定義為關鍵字 id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00' # 定義任務的id號,第二步中res的值 if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任務失敗')elif res.status == 'PENDING':print('任務等待中被執行')elif res.status == 'RETRY':print('任務異常后正在重試')elif res.status == 'STARTED':print('任務已經開始被執行')

app參數

celery配置文件參數 # 有些情況可以防止死鎖 CELERYD_FORCE_EXECV=True # 設置并發worker數量 CELERYD_CONCURRENCY=4 # 允許重試 CELERY_ACKS_LATE=True # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30

7、celery多任務結構

目錄結構:package_celery: # 項目名celery_task # celery包名__init__.py celery.py # celery 的app,必須叫celeryorder_task.py # 任務user_task.py # 任務result.py # 結果查詢submit_tast.py # 提交任務

celery.py

import celerybroker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2'app = celery.Celery(broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task' # 一定要記得注冊一下 ])

order_task.py

from .celery import app@app.task def cannel_order(name):return '用戶{}取消訂單'.format(name)

user_task.py

from .celery import app@app.task def send_msg(phone):return '{}發送短信成功'.format(phone)

result.py

from celery_task.celery import appfrom celery.result import AsyncResult# 關鍵字,變量不能定義為關鍵字 # 發短信任務: 39744a3f-02ec-4b8b-b855-111025e4abe4 # 取消訂單任務:6d1e5e91-236a-449c-ad32-eac093b240bdid = '6d1e5e91-236a-449c-ad32-eac093b240bd' if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任務失敗')elif res.status == 'PENDING':print('任務等待中被執行')elif res.status == 'RETRY':print('任務異常后正在重試')elif res.status == 'STARTED':print('任務已經開始被執行')

submit_tast.py

from celery_task import order_task, user_taskres = order_task.cannel_order.delay('allen') print(res) # 返回任務id res = user_task.send_msg.delay('13666666666') print(res) # 返回任務id

運行

# 運行worker(在package_celery目錄下執行) celery worker -A celery_task -l info -P eventlet

8、延時任務

8.1、方式一

# 發送短信為例: 2021年1月7日21點58分55秒發送短信 from datetime import datetime from celery_task import order_task, user_task# eta: 延遲多長時間執行,eta需要傳時間對象,并且是utc時間 v1 = datetime(2021, 1, 7, 21, 58, 55) v2 = datetime.utcfromtimestamp(v1.timestamp()) # 轉成utc時間,比正常時間慢8個小時 print(v2) res = user_task.send_msg.apply_async(args=['13666666666', ], eta=v2)

8.2、方式二

# 發送短信為例: 以當前時間為基準,過10秒后發送短信(隔幾秒后執行) from datetime import datetime, timedeltactime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay res = user_task.send_msg.apply_async(args=['13666666666', ], eta=task_time) print(res)

9、定時任務

# 在celery.py中配置 # 時區 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False# 任務的定時配置 from datetime import timedelta from celery.schedules import crontabapp.conf.beat_schedule = {'send-msg':{'task': 'celery_task.user_task.send_msg','schedule': timedelta(seconds=5), # 這里是timedelta# 'schedule': timedelta(hours=24*10), # # 這里是timedelta# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點,這里是crontab'schedule': crontab(hour=8, day_of_month=1), # 每月一號早八點,這里是crontab'args': ('13666666666',),} }# 添加任務: 自動添加任務,所以要啟動一個添加任務的服務 # 啟動beat,負責每隔3s提交一個任務 celery beat -A celery_task -l info# 啟動worker celery worker -A celery_task -l info -P eventlet

10、django中使用celery()

1、celery是獨立的,跟框架沒有關系 2、django-celery第三方模塊,兼容性不好不采用,使用通用方式 3、目錄結構celery_task__init__.pycelery.pyhome_task.pyorder_task.pyuser_task.py celery框架django項目工作流程 (1) 加載django配置環境 (2) 創建Celery框架對象app,配置broker和backend,得到的app就是worker (3) 給worker對應的app添加可處理的任務函數,用include配置給worker的app (4) 完成提供的任務的定時配置app.conf.beat_schedule (5) 啟動celery服務,運行worker,執行任務 (6) 啟動beat服務,運行beat,添加任務重點: 由于采用了django的反射機制,使用celery.py所在的celery_task包必須放置項目的根目錄下

路由

path('test_celery', views.test_celery),

視圖函數

def test_celery(request):from celery_task.celery import appfrom celery_task.user_task import send_msgfrom celery.result import AsyncResultid = request.GET.get('id')if id:res = AsyncResult(id=id, app=app)if res.successful():result = res.get()return HttpResponse(result)id = send_msg.delay('13666666666')print(id)return HttpResponse('發送短信成功')

總結

以上是生活随笔為你收集整理的分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。