日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Celery的实践指南

發布時間:2023/12/13 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Celery的实践指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Celery的實踐指南 celery原理: celery實際上是實現了一個典型的生產者-消費者模型的消息處理/任務調度統,消費者(worker)和生產者(client)都可以有任意個,他們通過消息系統(broker)來通信。 典型的場景為:
  • 客戶端啟動一個進程(生產者),當用戶的某些操作耗時較長或者比較頻繁時,考慮接入本消息系統,發送一個task任務給broker。
  • 后臺啟動一個worker進程(消費者),當發現broker中保存有某個任務到了該執行的時間,他就會拿過來,根據task類型和參數執行。
  • 實踐中的典型場景:
  • 簡單的定時任務:
  • 替換crontab的celery寫法:

  • from?celery?import?Celery
    from?celery.schedules?import?crontab

    app?=?Celery("tasks",?backend="redis://localhost",?broker="redis://localhost")

    app.conf.update(CELERYBEAT_SCHEDULE?=?{
    ? ? "add":?{
    ? ? ? ? "task":?"celery_demo.add",
    ? ? ? ? "schedule":?crontab(minute="*"),
    ? ? ? ? "args":?(16,?16)
    ? ? },
    })

    @app.task
    def?add(x,?y):
    ? ? return?x?+?y

  • 運行celery的worker,讓他作為consumer運行,自動從broker上獲得任務并執行。
  • `celery?-A?celery_demo?worker`
  • 運行celery的client,讓其根據schedule,自動生產出task msg,并發布到broker上。
  • `celery?-A?celery_demo?beat`
  • 安裝并運行flower,方便監控task的運行狀態
  • `celery?flower?-A?celery_demo`
  • 或者設置登錄密碼 ` celery?flower?-A celery_demo --basic_auth=user1:password1,user2:password2
  • 多同步任務-鏈式任務-
  • 失敗自動重試的task
  • 失敗重試方法: 將task代碼函數參數增加self,同時綁定bind。
  • demo代碼:
  • @app.task(bind=True,?default_retry_delay=300,?max_retries=5)
    def?my_task_A(self):
    ? ? try:
    ? ? ? ? print("doing?stuff?here...")
    ? ? except?SomeNetworkException?as?e:
    ? ? ? ? print("maybe?do?some?clenup?here....")
    ? ? ? ? self.retry(e)
  • 自動重試后,是否將任務重新入queue后排隊,還是等待指定的時間?可以通過self.retry()參數來指定。
  • 派發到不同Queue隊列的task
  • 一個task自動映射到多個queue中的方法, 通過配置task和queue的routing_key命名模式。
  • 比如:把queue的exchange和routing_key配置成通用模式:
  • 再定義task的routing_key的名稱:
  • 可用的不同exchange策略:
  • direct:直接根據定義routing_key
  • topic:exchange會根據通配符來將一個消息推送到多個queue。
  • fanout:將消息拆分,分別推送到不同queue,通常用于超大任務,耗時任務。
  • 參考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  • 高級配置
  • result是否保存
  • 失敗郵件通知:
  • 關閉rate limit:
  • auto_reload方法(*nix系統):
  • celery通過監控源代碼目錄的改動,自動地進行reload
  • 使用方法:1.依賴inotify(Linux) 2. kqueue(OS X / BSD)
  • 安裝依賴: $?pip?install?pyinotify
  • (可選) 指定fsNotify的依賴: $?env?CELERYD_FSNOTIFY=stat?celery?worker?-l?info?--autoreload
  • 啟動: celery -A appname worker --autoreload
  • auto-scale方法:
  • 啟用auto-scale
  • 臨時增加worker進程數量(增加consumer): $?celery?-A?proj?control?add_consumer?foo?-d?worker1.local
  • 臨時減少worker進程數量(減少consumer):
  • 將scheduled task的配置從app.conf變成DB的方法:
  • 需要在啟動時指定custom schedule 類名,比如默認的是: celery.beat.PersistentScheduler 。
  • celery?-A?proj?beat?-S?djcelery.schedulers.DatabaseScheduler
  • 啟動停止worker的方法:
  • 啟動 as daemon :?http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
  • root用戶可以使用celeryd
  • 非特權用戶:celery multi start worker1 -A appName ?—autoreload ?--pidfile="$HOME/run/celery/%n.pid"? --logfile="$HOME/log/celery/%n.log"
  • 或者 celery worker —detach
  • 停止
  • ps?auxww?|?grep?'celery?worker'?|?awk?'{print?$2}'?|?xargs?kill?-9
  • 與Flask集成的方法
  • 集成后flask將充當producer來創建并發送task給broker,在celery啟動的獨立worker進程將從broker中獲得task并執行,同時將結果返回。
  • flask中異步地獲得task結果的方法:add.delay(x,y),有時需要對參數進行命名后傳遞 或者 add.apply_async(args=(x,y), countdown=30)
  • flask獲得
  • 與flask集成后的啟動問題
  • 由于celery的默認routing_key是根據生產者在代碼中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在項目頂級目錄上,否者會出現KeyError。
  • 性能提升: eventlet 和 greenlet
  • 官方參考:http://docs.celeryproject.org/en/latest/userguide/index.html

    轉載于:https://www.cnblogs.com/ToDoToTry/p/5453149.html

    總結

    以上是生活随笔為你收集整理的Celery的实践指南的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。