Python 定时任务的几种实现方式
一、循環(huán) sleep
from datetime import datetime import time # 每n秒執(zhí)行一次 def timer(n):while True:# TodoSomethingprint("定時任務啟動了。。。。")print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))time.sleep(n) # 5s timer(5)執(zhí)行結果如下:
定時任務啟動了。。。。 2022-07-16 15:31:16 定時任務啟動了。。。。 2022-07-16 15:31:21 定時任務啟動了。。。。 2022-07-16 15:31:26 定時任務啟動了。。。。 2022-07-16 15:31:31 定時任務啟動了。。。。 2022-07-16 15:31:36 。 。 。這個方法的缺點是,只能執(zhí)行固定間隔時間的任務,并且是一直執(zhí)行中,如果有定時任務就無法完成,比如早上七點鐘喊我起床。并且 sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,啥都不能做。
二、threading模塊中的Timer
from datetime import datetime from threading import Timer # 打印時間函數(shù) def printTime(inc):print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))t = Timer(inc, printTime, (inc,))t.start() # 5s printTime(5)運行結果哦如下:
2022-07-16 15:37:30 2022-07-16 15:37:35 2022-07-16 15:37:40 2022-07-16 15:37:45 2022-07-16 15:37:50Timer 函數(shù)第一個參數(shù)是時間間隔(單位是秒),第二個參數(shù)是要調(diào)用的函數(shù)名,第三個參數(shù)是調(diào)用函數(shù)的參數(shù)(tuple)
三、使用sched模塊
sched 模塊是 Python 內(nèi)置的模塊,它是一個調(diào)度(延時處理機制),每次想要定時執(zhí)行某任務都必須寫入一個調(diào)度。
import sched import time from datetime import datetime# 初始化sched模塊的 scheduler 類 # 第一個參數(shù)是一個可以返回時間戳的函數(shù),第二個參數(shù)可以在定時未到達之前阻塞。 schedule = sched.scheduler(time.time, time.sleep)# 被周期性調(diào)度觸發(fā)的函數(shù) def printTime(inc):print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))schedule.enter(inc, 0, printTime, (inc,))# 默認參數(shù)60s def main(inc=60):# enter四個參數(shù)分別為:間隔事件、優(yōu)先級(用于同時間到達的兩個事件同時執(zhí)行時定序)、被調(diào)用觸發(fā)的函數(shù),# 給該觸發(fā)函數(shù)的參數(shù)(tuple形式)schedule.enter(0, 0, printTime, (inc,))schedule.run()# 10s 輸出一次 main(10)執(zhí)行結果如下:
2022-07-16 15:44:20 2022-07-16 15:44:30 2022-07-16 15:44:40 2022-07-16 15:44:50 2022-07-16 15:45:00sched 使用步驟如下:
1.生成調(diào)度器:
2.加入調(diào)度事件
其實有 enter、enterabs 等等,我們以 enter 為例子。
四個參數(shù)分別為:間隔事件、優(yōu)先級(用于同時間到達的兩個事件同時執(zhí)行時定序)、被調(diào)用觸發(fā)的函數(shù),給觸發(fā)函數(shù)的參數(shù)(注意:一定要以 tuple 給,如果只有一個參數(shù)就(xx,))
3.運行
s.run()注意 sched 模塊不是循環(huán)的,一次調(diào)度被執(zhí)行后就 Over 了,如果想再執(zhí)行,請再次 enter
四、APScheduler定時框架
終于找到了可以每天定時喊我起床的方式了
APScheduler是一個 Python 定時任務框架,使用起來十分方便。提供了基于日期、固定時間間隔以及 crontab 類型的任務,并且可以持久化任務、并以 daemon 方式運行應用。
使用 APScheduler 需要安裝
$ pip install apscheduler首先來看一個周一到周五每天早上6點半喊我起床的例子
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, 'cron', day_of_week='1-5', hour=6, minute=30) scheduler.start()代碼中的 BlockingScheduler 是什么呢?
BlockingScheduler是APScheduler中的調(diào)度器,APScheduler 中有兩種常用的調(diào)度器,BlockingScheduler 和 BackgroundScheduler,當調(diào)度器是應用中唯一要運行的任務時,使用 BlockingSchedule,如果希望調(diào)度器在后臺執(zhí)行,使用 BackgroundScheduler。
APScheduler四個組件
APScheduler 四個組件分別為:觸發(fā)器(trigger),作業(yè)存儲(job store),執(zhí)行器(executor),調(diào)度器(scheduler)。
(1)觸發(fā)器(trigger)
包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的
APScheduler 有三種內(nèi)建的 trigger:
(2)作業(yè)存儲(job store)
存儲被調(diào)度的作業(yè),默認的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。
APScheduler 默認使用 MemoryJobStore,可以修改使用 DB 存儲方案
(3)執(zhí)行器(executor)
處理作業(yè)的運行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進城池來進行。當作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。
最常用的 executor 有兩種:
(4)調(diào)度器(scheduler)
通常在應用中只有一個調(diào)度器,應用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。
配置調(diào)度器
APScheduler提供了許多不同的方式來配置調(diào)度器,你可以使用一個配置字典或者作為參數(shù)關鍵字的方式傳入。你也可以先創(chuàng)建調(diào)度器,再配置和添加作業(yè),這樣你可以在不同的環(huán)境中得到更大的靈活性。
下面來看一個簡單的 BlockingScheduler 例子:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetimedef job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 定義BlockingScheduler sched = BlockingScheduler() sched.add_job(job, 'interval', seconds=5) sched.start()上述代碼創(chuàng)建了一個 BlockingScheduler,并使用默認內(nèi)存存儲和默認執(zhí)行器。(默認選項分別是 MemoryJobStore 和 ThreadPoolExecutor,其中線程池的最大線程數(shù)為10)。配置完成后使用 start() 方法來啟動。
如果想要顯式設置 job store(使用mongo存儲)和 executor 可以這樣寫:
from datetime import datetime from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # MongoDB 參數(shù) host = '127.0.0.1' port = 27017 client = MongoClient(host, port) # 輸出時間 def job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 存儲方式 jobstores = {'mongo': MongoDBJobStore(collection='job', database='test', client=client),'default': MemoryJobStore() } executors = {'default': ThreadPoolExecutor(10),'processpool': ProcessPoolExecutor(3) } job_defaults = {'coalesce': False,'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') scheduler.start()對 job 的操作
添加 job
添加job有兩種方式:
add_job() scheduled_job()第二種方法只適用于應用運行期間不會改變的 job,而第一種方法返回一個apscheduler.job.Job 的實例,可以用來改變或者移除 job。
from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() # 裝飾器 @sched.scheduled_job('interval', id='my_job_id', seconds=5) def job_function():print("Hello World") # 開始 sched.start()@sched.scheduled_job() 是 Python 的裝飾器。
移除 job
移除 job 也有兩種方法:
remove_job() job.remove()remove_job 使用 jobID 移除
job.remove() 使用 add_job() 返回的實例
暫停和恢復 job
暫停一個 job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()恢復一個 job:
apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()希望你還記得 apscheduler.job.Job 是 add_job() 返回的實例
獲取 job 列表
獲得可調(diào)度 job 列表,可以使用get_jobs() 來完成,它會返回所有的 job 實例。
也可以使用print_jobs() 來輸出所有格式化的 job 列表。
修改 job
除了 jobID 之外 job 的所有屬性都可以修改,使用 apscheduler.job.Job.modify() 或者 modify_job() 修改一個 job 的屬性
job.modify(max_instances=6, name='Alternate name') modify_job('my_job_id', trigger='cron', minute='*/5')關閉 job
默認情況下調(diào)度器會等待所有的 job 完成后,關閉所有的調(diào)度器和作業(yè)存儲。將 wait 選項設置為 False 可以立即關閉。
scheduler.shutdown() scheduler.shutdown(wait=False) scheduler 事件scheduler 可以添加事件監(jiān)聽器,并在特殊的時間觸發(fā)。
def my_listener(event):if event.exception:print('The job crashed :(')else:print('The job worked :)') # 添加監(jiān)聽器 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)trigger 規(guī)則
date
最基本的一種調(diào)度,作業(yè)只會執(zhí)行一次。它的參數(shù)如下:
run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
cron
year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)表達式:
interval
參數(shù):
- weeks (int) – number of weeks to wait
- days (int) – number of days to wait
- hours (int) – number of hours to wait
- minutes (int) – number of minutes to wait
- seconds (int) – number of seconds to wait
- start_date (datetime|str) – starting point for the interval
calculation - end_date (datetime|str) – latest possible date/time to trigger on
- timezone (datetime.tzinfo|str) – time zone to use for the date/time
calculations
總結
以上是生活随笔為你收集整理的Python 定时任务的几种实现方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 再说鸿蒙
- 下一篇: websocket python爬虫_p