日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python任务调度框架_python任务调度框架apscheduler【转】

發布時間:2025/3/21 python 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python任务调度框架_python任务调度框架apscheduler【转】 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介

APScheduler(以下簡稱APS)框架可以讓用戶定時執行或者周期性執行Python任務。既可以添加任務也可以刪除任務,還可以將任務存儲在數據庫中。當APS重啟之后,還會繼續執行之前設置的任務。

APS是跨平臺的,注意APS既不是守護進程也不是服務,更不是命令行程序。APS是進程內的調度器,也就是說它的實現原理是在進程內產生內置的阻塞來創建定時服務,以便在預定的時間內執行某個任務。

APS支持以下三種定時任務:

crontab類型任務

固定時間間隔任務

基于日期時間的一次性任務

你可以使用上面任意一種定時任務或者組合任務。

APS支持以下面幾種方式保存:

內存

SQLAlchemy

MongoDB

Redis

APS也可以整合進其他幾個Python框架:

asyncio

gevent

Tornado

Twisted

Qt

安裝

使用pip進行安裝:

$ pip install apscheduler

如果你沒有安裝pip,可以通過下面鏈接進行安裝get-pip.py

如果不能通過pip下載,也可以直接下載APS安裝包,解壓再進行安裝

$ python setup.py install

基礎概念

APS由以下幾部分組成:

觸發器(triggers)

任務倉庫(job stores)

執行器(executors)

調度器(schedulers)

觸發器包含了所有定時任務邏輯,每個任務都有一個對應的觸發器,觸發器決定任何的何時執行,初始配置情況下,觸發器是無狀態的。

任務倉庫保存要執行的任務,其中一個默認的任務倉庫將任務保存在內存中,而另外幾個任務倉庫將任務保存在數據庫中。在將任務保存到任務倉庫前,會對任務執行序列化操作,當重新讀取任務時,再執行反序列化操作。除了默認的任務倉庫,其他任務倉庫都不會在內存中保存任務,而是作為任務保存、加載、更新以及搜索的一個中間件。任務倉庫在定時器之間不能共享。

執行器用來執行定時任務,它只是將要執行的任務放在新的線程或者線程池中運行。執行完畢之后,再通知定時器。

調度器將其它幾個組件聯系在一起,一般在應用中只有一個調度器,程序開發者不會直接操作觸發器、任務倉庫或執行器,相反,調度器提供了這個接口。任務倉庫以及執行器的配置都是通過調度器來實現的。

選擇合適的調度器、執行器以及任務倉庫

選擇調度器是根據我們的開發環境與實際應用來決定的,下面是一些常用調度器:

BlockingScheduler:適合于只在進程中運行單個任務的情況

BackgroundScheduler: 適合于要求任何在程序后臺運行的情況

AsyncIOScheduler:適合于使用asyncio框架的情況

GeventScheduler: 適合于使用gevent框架的情況

TornadoScheduler: 適合于使用Tornado框架的應用

TwistedScheduler: 適合使用Twisted框架的應用

QtScheduler: 適合使用QT的情況

而任務倉庫,如果是非持久任務,使用MemoryStore就可以了,如果是持久性任務,那么久需要根據編程環境進行選擇了。

大多數情況下,執行器選擇ThreadPoolExecutor就可以了,但是如果涉及到比較耗CPU的任務,就可以選擇ProcessPoolExecutor,以充分利用多核CPU。,當然也可以同時使用兩個執行器。

可以在相應的API中找到對應的任務倉庫以及執行器。

配置任務調度器

APS提供了多種調度器配置方法,既可以使用配置字典,也可以直接傳遞配置參數給調度器,還可以先初始化調度器,添加完任務之后,最后再來配置調度器。

相關的配置參數可以參考API文檔。

舉個簡單例子:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# Initialize the rest of the application here, or before the scheduler initialization

上面的代碼生成一個后臺調度器,使用默認名為default的MemoryJobStore,以及默認名為default的ThreadPoolExecutor,最大線程數為10。

現在假設你想使用兩個任務倉庫以及兩個執行器,并且還想調整下任務的默認參數。可以使用下面三種方法,所完成的功能是一樣的。

方法1:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {

'mongo': MongoDBJobStore(),

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': ThreadPoolExecutor(20),

'processpool': ProcessPoolExecutor(5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法2:

from apscheduler.schedulers.background import BackgroundScheduler

# The "apscheduler." prefix is hard coded

scheduler = BackgroundScheduler({

'apscheduler.jobstores.mongo': {

'type': 'mongodb'

},

'apscheduler.jobstores.default': {

'type': 'sqlalchemy',

'url': 'sqlite:///jobs.sqlite'

},

'apscheduler.executors.default': {

'class': 'apscheduler.executors.pool:ThreadPoolExecutor',

'max_workers': '20'

},

'apscheduler.executors.processpool': {

'type': 'processpool',

'max_workers': '5'

},

'apscheduler.job_defaults.coalesce': 'false',

'apscheduler.job_defaults.max_instances': '3',

'apscheduler.timezone': 'UTC',

})

方法3:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {

'mongo': {'type': 'mongodb'},

'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

}

executors = {

'default': {'type': 'threadpool', 'max_workers': 20},

'processpool': ProcessPoolExecutor(max_workers=5)

}

job_defaults = {

'coalesce': False,

'max_instances': 3

}

scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動調度器

使用start()方法啟動調度器,BlockingScheduler需要在初始化之后才能執行start(),對于其他的Scheduler,調用start()方法都會直接返回,然后可以繼續執行后面的初始化操作。

調度器啟動之后,就不能更改它的配置了。

添加任務

有兩種添加任務的辦法:

調用add_job()

使用scheduled_job()修飾器

第一個方法是使用最多的,因為調用它會返回一個apscheduler.job.Job實例,后續可以對它進行修改或者刪除,而使用修飾器添加的任務添加之后就不能進行修改。

在調度器中設置定時任務,如果任務添加的時候,調度器還沒有啟動,那么任務只是暫時放到調度器中,當調度器啟動之后重新計算第一次執行時間。

需要注意的是,可以使用執行器或者任務倉庫來序列化任務,但是這個任務必須滿足兩個條件:

調用對象必須全局可訪問

調用對象的參數必須也可以序列化。

在所有內置的任務倉庫中,只有MemoryJobStore不能序列化對象;在所有內置的執行器中,只有ProcessPoolExecutors會序列化任務。

如果你需要在應用中使用持久性任務,那就必須給任務定義一個ID,并設置replace_existing=True,否則每次重啟應用都會返回一個新的任務。

如果要立即執行任務,只需要在添加任務的時候省略trigger參數。

刪除任務

當從調度器中刪除任務的時候,就會從相關聯的任務倉庫中刪除任務,后面就不會再執行了,有兩種方法來刪除任務。

調用remove_job(),參數為job ID以及任務倉庫名

調用remove()

第二種方法用起來更加方便,但是需要先保存添加任務時返回的實例對象;而通過scheduled_job()添加的任務,只能使用第一種方法進行刪除。

如果任務執行完畢,它會自動被刪除。

例如:

job = scheduler.add_job(myfunc, 'interval', minutes=2)

job.remove()

同樣的,如果顯式使用任務ID,則使用下面的方法:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

scheduler.remove_job('my_job_id')

暫停與恢復任務

暫停與恢復任務也很簡單,可以直接操作任務實例或者調度器來實現,當任務暫停時,它的運行時間會被重置,暫停期間不會計算進去,重啟之后又算進去。

暫停任務可以使用以下兩種方法:

apscheduler.job.Job.pause()

apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復任務可以使用以下兩種方法:

apscheduler.job.Job.resume()

apscheduler.schedulers.BaseScheduler.resume_job()

獲取任務列表

可以使用get_jobs()方法來獲取當前正在處理的任務列表,如果只是想獲取某個任務倉庫中的任務列表,可以使用任務倉庫名作為參數傳入。

APS還提供了一個print_jobs()方法來打印格式化的任務列表。

修改任務

使用apscheduler.job.Job.modify()或者modify_job()方法可以修改任務的屬性,你可以修改任務的任意屬性,除了id。

例如:

job.modify(max_instances=6, name='Alternate name')

如果你想重新調度某個任務,例如改變它的觸發器,則可以使用apscheduler.job.Job.reschedule()或者reschedule_job()方法,這個兩個方法都可以為任務重新創建一個觸發器,并重新計算任務的運行時間。

例如:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

關閉調度器

使用下面的方法關閉調度器:

scheduler.shutdown()

默認情況下,scheduler會關閉它的任務倉庫以及執行器,并等待所有正在執行的任務執行完。如果你不想等待,可以這樣操作:

scheduler.shutdown(wait=False)

限制任務實例并發執行的個數

默認情況下,只允許同時執行一個任務實例,通過max_instances參數來設置允許并發執行任務的個數。

錯失任務的合并

有時候調度器可能無法按時執行某個任務,最常見的情況就是當某個持久性任務保存在任務倉庫中時,調度器關閉之后再重啟,但是任務需要在重啟之前被執行,這時這個任務就被錯過了。調度器會根據任務misfire_grace_time參數的配置來決定錯過的任務還要不要繼續執行。這樣做的話可能導致重啟調度器之后會連續執行好幾個任務。

調度器事件

可以給調度器添加事件監聽器,調度器事件只有在某些情況下才會被觸發,并且可以攜帶某些有用的信息。通過給add_listener()傳遞合適的mask參數,可以只監聽幾種特定的事件類型。

例如:

def my_listener(event):

if event.exception:

print('The job crashed :(')

else:

print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

總結

以上是生活随笔為你收集整理的python任务调度框架_python任务调度框架apscheduler【转】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。