日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 定时任务的几种实现方式

發(fā)布時間:2023/12/20 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 定时任务的几种实现方式 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、循環(huán) sleep

from datetime import datetime import time # 每n秒執(zhí)行一次 def timer(n):while True:# TodoSomethingprint("定時任務啟動了。。。。")print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))time.sleep(n) # 5s timer(5)

執(zhí)行結果如下:

定時任務啟動了。。。。 2022-07-16 15:31:16 定時任務啟動了。。。。 2022-07-16 15:31:21 定時任務啟動了。。。。 2022-07-16 15:31:26 定時任務啟動了。。。。 2022-07-16 15:31:31 定時任務啟動了。。。。 2022-07-16 15:31:36 。 。 。

這個方法的缺點是,只能執(zhí)行固定間隔時間的任務,并且是一直執(zhí)行中,如果有定時任務就無法完成,比如早上七點鐘喊我起床。并且 sleep 是一個阻塞函數(shù),也就是說 sleep 這一段時間,啥都不能做。

二、threading模塊中的Timer

from datetime import datetime from threading import Timer # 打印時間函數(shù) def printTime(inc):print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))t = Timer(inc, printTime, (inc,))t.start() # 5s printTime(5)

運行結果哦如下:

2022-07-16 15:37:30 2022-07-16 15:37:35 2022-07-16 15:37:40 2022-07-16 15:37:45 2022-07-16 15:37:50

Timer 函數(shù)第一個參數(shù)是時間間隔(單位是秒),第二個參數(shù)是要調(diào)用的函數(shù)名,第三個參數(shù)是調(diào)用函數(shù)的參數(shù)(tuple)

三、使用sched模塊

sched 模塊是 Python 內(nèi)置的模塊,它是一個調(diào)度(延時處理機制),每次想要定時執(zhí)行某任務都必須寫入一個調(diào)度。

import sched import time from datetime import datetime# 初始化sched模塊的 scheduler 類 # 第一個參數(shù)是一個可以返回時間戳的函數(shù),第二個參數(shù)可以在定時未到達之前阻塞。 schedule = sched.scheduler(time.time, time.sleep)# 被周期性調(diào)度觸發(fā)的函數(shù) def printTime(inc):print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))schedule.enter(inc, 0, printTime, (inc,))# 默認參數(shù)60s def main(inc=60):# enter四個參數(shù)分別為:間隔事件、優(yōu)先級(用于同時間到達的兩個事件同時執(zhí)行時定序)、被調(diào)用觸發(fā)的函數(shù),# 給該觸發(fā)函數(shù)的參數(shù)(tuple形式)schedule.enter(0, 0, printTime, (inc,))schedule.run()# 10s 輸出一次 main(10)

執(zhí)行結果如下:

2022-07-16 15:44:20 2022-07-16 15:44:30 2022-07-16 15:44:40 2022-07-16 15:44:50 2022-07-16 15:45:00

sched 使用步驟如下:
1.生成調(diào)度器:

s = sched.scheduler(time.time,time.sleep)

2.加入調(diào)度事件
其實有 enter、enterabs 等等,我們以 enter 為例子。

s.enter(x1,x2,x3,x4)

四個參數(shù)分別為:間隔事件、優(yōu)先級(用于同時間到達的兩個事件同時執(zhí)行時定序)、被調(diào)用觸發(fā)的函數(shù),給觸發(fā)函數(shù)的參數(shù)(注意:一定要以 tuple 給,如果只有一個參數(shù)就(xx,))

3.運行

s.run()

注意 sched 模塊不是循環(huán)的,一次調(diào)度被執(zhí)行后就 Over 了,如果想再執(zhí)行,請再次 enter

四、APScheduler定時框架

終于找到了可以每天定時喊我起床的方式了

APScheduler是一個 Python 定時任務框架,使用起來十分方便。提供了基于日期、固定時間間隔以及 crontab 類型的任務,并且可以持久化任務、并以 daemon 方式運行應用。

使用 APScheduler 需要安裝

$ pip install apscheduler

首先來看一個周一到周五每天早上6點半喊我起床的例子

from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, 'cron', day_of_week='1-5', hour=6, minute=30) scheduler.start()

代碼中的 BlockingScheduler 是什么呢?

BlockingScheduler是APScheduler中的調(diào)度器,APScheduler 中有兩種常用的調(diào)度器,BlockingScheduler 和 BackgroundScheduler,當調(diào)度器是應用中唯一要運行的任務時,使用 BlockingSchedule,如果希望調(diào)度器在后臺執(zhí)行,使用 BackgroundScheduler。

APScheduler四個組件

APScheduler 四個組件分別為:觸發(fā)器(trigger),作業(yè)存儲(job store),執(zhí)行器(executor),調(diào)度器(scheduler)。

(1)觸發(fā)器(trigger)

包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的
APScheduler 有三種內(nèi)建的 trigger:

date: 特定的時間點觸發(fā) interval: 固定時間間隔觸發(fā) cron: 在特定時間周期性地觸發(fā)

(2)作業(yè)存儲(job store)
存儲被調(diào)度的作業(yè),默認的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。
APScheduler 默認使用 MemoryJobStore,可以修改使用 DB 存儲方案

(3)執(zhí)行器(executor)
處理作業(yè)的運行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進城池來進行。當作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。
最常用的 executor 有兩種:

ProcessPoolExecutor ThreadPoolExecutor

(4)調(diào)度器(scheduler)

通常在應用中只有一個調(diào)度器,應用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。

配置調(diào)度器
APScheduler提供了許多不同的方式來配置調(diào)度器,你可以使用一個配置字典或者作為參數(shù)關鍵字的方式傳入。你也可以先創(chuàng)建調(diào)度器,再配置和添加作業(yè),這樣你可以在不同的環(huán)境中得到更大的靈活性。

下面來看一個簡單的 BlockingScheduler 例子:

from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetimedef job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 定義BlockingScheduler sched = BlockingScheduler() sched.add_job(job, 'interval', seconds=5) sched.start()

上述代碼創(chuàng)建了一個 BlockingScheduler,并使用默認內(nèi)存存儲和默認執(zhí)行器。(默認選項分別是 MemoryJobStore 和 ThreadPoolExecutor,其中線程池的最大線程數(shù)為10)。配置完成后使用 start() 方法來啟動。

如果想要顯式設置 job store(使用mongo存儲)和 executor 可以這樣寫:

from datetime import datetime from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # MongoDB 參數(shù) host = '127.0.0.1' port = 27017 client = MongoClient(host, port) # 輸出時間 def job():print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 存儲方式 jobstores = {'mongo': MongoDBJobStore(collection='job', database='test', client=client),'default': MemoryJobStore() } executors = {'default': ThreadPoolExecutor(10),'processpool': ProcessPoolExecutor(3) } job_defaults = {'coalesce': False,'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') scheduler.start()

對 job 的操作

添加 job

添加job有兩種方式:

add_job() scheduled_job()

第二種方法只適用于應用運行期間不會改變的 job,而第一種方法返回一個apscheduler.job.Job 的實例,可以用來改變或者移除 job。

from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() # 裝飾器 @sched.scheduled_job('interval', id='my_job_id', seconds=5) def job_function():print("Hello World") # 開始 sched.start()

@sched.scheduled_job() 是 Python 的裝飾器。

移除 job

移除 job 也有兩種方法:

remove_job() job.remove()

remove_job 使用 jobID 移除
job.remove() 使用 add_job() 返回的實例

job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # id scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')

暫停和恢復 job

暫停一個 job:

apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復一個 job:

apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()

希望你還記得 apscheduler.job.Job 是 add_job() 返回的實例

獲取 job 列表

獲得可調(diào)度 job 列表,可以使用get_jobs() 來完成,它會返回所有的 job 實例。

也可以使用print_jobs() 來輸出所有格式化的 job 列表。

修改 job

除了 jobID 之外 job 的所有屬性都可以修改,使用 apscheduler.job.Job.modify() 或者 modify_job() 修改一個 job 的屬性

job.modify(max_instances=6, name='Alternate name') modify_job('my_job_id', trigger='cron', minute='*/5')

關閉 job

默認情況下調(diào)度器會等待所有的 job 完成后,關閉所有的調(diào)度器和作業(yè)存儲。將 wait 選項設置為 False 可以立即關閉。

scheduler.shutdown() scheduler.shutdown(wait=False) scheduler 事件

scheduler 可以添加事件監(jiān)聽器,并在特殊的時間觸發(fā)。

def my_listener(event):if event.exception:print('The job crashed :(')else:print('The job worked :)') # 添加監(jiān)聽器 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

trigger 規(guī)則

date

最基本的一種調(diào)度,作業(yè)只會執(zhí)行一次。它的參數(shù)如下:

run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() def my_job(text):print(text) # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text']) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=['text']) sched.start()

cron

year (int|str)4-digit year month (int|str)month (1-12) day (int|str) – day of the (1-31) week (int|str)ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str)hour (0-23) minute (int|str)minute (0-59) second (int|str)second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

表達式:

from apscheduler.schedulers.blocking import BlockingSchedulerdef job_function():print("Hello World") # BlockingScheduler sched = BlockingScheduler() # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') sched.start()

interval

參數(shù):

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval
    calculation
  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time
    calculations
from datetime import datetime from apscheduler.schedulers.blocking import BlockingSchedulerdef job_function():print("Hello World") # BlockingScheduler sched = BlockingScheduler() # Schedule job_function to be called every two hours sched.add_job(job_function, 'interval', hours=2) # The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00 sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') sched.start()

總結

以上是生活随笔為你收集整理的Python 定时任务的几种实现方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。