日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

容器中apscheduler不执行_APScheduler:定时任务框架

發(fā)布時間:2025/3/8 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 容器中apscheduler不执行_APScheduler:定时任务框架 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

APScheduler:定時任務(wù)框架

安裝

文檔: https://apscheduler.readthedocs.io/en/stable/userguide.html

安裝

$?pip?install?apscheduler
>>>?import?apscheduler
>>>?apscheduler.version
'3.6.3'

組件

APScheduler由一下四部分組成

  • triggers:觸發(fā)器,指定定時任務(wù)執(zhí)行的時機,每個任務(wù)都有自己的觸發(fā)器.
  • job stores:存儲器,持久存儲,默認(rèn)存儲在內(nèi)存中.
  • executors:執(zhí)行器,在定時任務(wù)執(zhí)行時,以進(jìn)程或線程方式執(zhí)行
  • scheduler:調(diào)度器,包含BackgroundScheduler(后臺運行)和BlockingScheduler(阻塞運行).他會合理安排作業(yè)存儲器,執(zhí)行器,觸發(fā)器進(jìn)行工作.并進(jìn)行添加和刪除任務(wù)等.調(diào)度器通常是只有一個的,開發(fā)人員很少直接操作觸發(fā)器,存儲器,執(zhí)行器等.因為這些都由調(diào)度器自動來實現(xiàn)了.
10334

觸發(fā)器(triggers)

1.date在特定時間執(zhí)行

示例:

from?datetime?import?date,?datetime

from?apscheduler.schedulers.blocking?import?BlockingScheduler


sched?=?BlockingScheduler()

def?my_job(text):
???print(text)

#?run_date?接受?date,?datetime?數(shù)據(jù)類型
sched.add_job(my_job,?'date',?run_date=datetime(2020,?11,?27,?19,?33,?30),?args=['text'])

sched.start()

更多:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html

2.interval間隔執(zhí)行

在固定的時間間隔后觸發(fā)事件.參數(shù)如下:

weeks周,整形
days一個月中的第幾天,整形
hours時,整形
minutes分,整形
seconds秒,整形
start_date起始時間
end_date結(jié)束時間
jitter觸發(fā)的時間誤差
from?apscheduler.schedulers.blocking?import?BlockingScheduler
from?datetime?import?datetime


sched?=?BlockingScheduler()

def?job_funciton():
???print('hello?world')


sched.add_job(job_funciton,?trigger='interval',seconds=5)
#?指定小時
#?sched.add_job(job_funciton,?trigger='interval',?hours=2)
#?指定開始,結(jié)束時間
#?sched.add_job(job_funciton,?trigger='interval',?start_date='2020-11-27?20:30:00',?end_date='2010-11-27?21:30:00)
sched.start()?

3.crontab

在某個確切的時間周期性觸發(fā)事件.

year年month1-12月day1-31日
week1-53周day_of_week一周中的第幾天(0/Monday)hour0-23
minute0-59second0-59start_datedatetime數(shù)據(jù)類型,或者字符串類型
end_date結(jié)束時間timezone時區(qū)jitter觸發(fā)的誤差時間

也可以用表達(dá)式類型,可以用以下方式:

表達(dá)式字段描述
*任何在每個值都觸發(fā)
*/a任何每隔 a觸發(fā)一次
a-b任何在 a-b區(qū)間內(nèi)任何一個時間觸發(fā)( a必須小于 b)
a-b/c任何在 a-b區(qū)間內(nèi)每隔 c觸發(fā)一次
xth yday第 x個星期 y觸發(fā)
lastxday最后一個星期 x觸發(fā)
lastday一個月中的最后一天觸發(fā)
x,y,z任何可以把上面的表達(dá)式進(jìn)行組合
from?apscheduler.schedulers.blocking?import?BlockingScheduler


def?job_function():
???print?"Hello?World"

sched?=?BlockingScheduler()

#?Schedules?job_function?to?be?run?on?the?third?Friday
#?of?June,?July,?August,?November?and?December?at?00:00,?01:00,?02:00?and?03:00
sched.add_job(job_function,?'cron',?month='6-8,11-12',?day='3rd?fri',?hour='0-3')

sched.start()

調(diào)度器(schedulers)

  • BlockingScheduler:適用于調(diào)度程序是進(jìn)程中唯一運行的進(jìn)程,調(diào)用 start函數(shù)會阻塞當(dāng)前線程,不能立即返回。
  • BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺運行,調(diào)用 start后主線程不會阻塞。
  • AsyncIOScheduler:適用于使用了 asyncio模塊的應(yīng)用程序。
  • GeventScheduler:適用于使用 gevent模塊的應(yīng)用程序。
  • TwistedScheduler:適用于構(gòu)建 Twisted的應(yīng)用程序。
  • QtScheduler:適用于構(gòu)建 Qt的應(yīng)用程序。
  • TornadoScheduler: tornado
  • 任務(wù)存儲器(job stores)

    有2中方式,一種是加載在內(nèi)存中(默認(rèn)配置),一種是使用數(shù)據(jù)庫.使用內(nèi)存簡單高效,但是程序出現(xiàn)問題,從新運行,會把以前的任務(wù)從新再執(zhí)行一次.數(shù)據(jù)庫則可以在中斷的地方恢復(fù)正常使用.

    • MemoryJobStore:使用內(nèi)存
    • MongoDBJobStore:使用mongodb
    • RedisJobStore:使用redis
    • SQLAlchemy:使用SQLAlchemy框架

    1.RedisJobStore

    RedisJobStore(db=0,?jobs_key='apscheduler.jobs',?run_times_key='apscheduler.run_times',?pickle_protocol=pickle.HIGHEST_PROTOCOL,?**connect_args)

    有2種創(chuàng)建的方法:

    • add_jobstore:需要指定redis的相關(guān)參數(shù).
    from?datetime?import?datetime,?timedelta
    import?sys
    import?os

    from?apscheduler.schedulers.blocking?import?BlockingScheduler
    from?apscheduler.jobstores.redis?import?RedisJobStore



    def?alarm(time):
    ????print('Alarm!?This?alarm?was?scheduled?at?%s.'?%?time)


    if?__name__?==?'__main__':
    ????scheduler?=?BlockingScheduler()
    ????scheduler.add_jobstore('redis',?jobs_key='example.jobs',?run_times_key='example.run_times',?host='192.168.0.101',?port=6379,?db=0)
    ????if?len(sys.argv)?>?1?and?sys.argv[1]?==?'--clear':
    ????????scheduler.remove_all_jobs()

    ????alarm_time?=?datetime.now()?+?timedelta(seconds=300)
    ????scheduler.add_job(alarm,?'date',?run_date=alarm_time,?args=[datetime.now()])
    ????print('To?clear?the?alarms,?run?this?example?with?the?--clear?argument.')
    ????print('Press?Ctrl+{0}?to?exit'.format('Break'?if?os.name?==?'nt'?else?'C'))

    ????try:
    ????????scheduler.start()
    ????except?(KeyboardInterrupt,?SystemExit):
    ????????pass
    • RedisJobStore
    from?apscheduler.schedulers.blocking?import?BlockingScheduler
    from?apscheduler.jobstores.redis?import?RedisJobStore
    from?datetime?import?datetime,?timedelta

    jobstore?=?{
    ????'default'?:?RedisJobStore(db=0,?jobs_key='myfunc',?run_times_key='myfunc_time',?host='192.168.0.101',?port=6379)
    }

    def?my_func(t):
    ????print('hello?world,?%s'?%t)


    if?__name__?==?'__main__':
    ????sched?=?BlockingScheduler(jobstores=jobstore)
    ????alarm_time?=?datetime.now()?+?timedelta(seconds=300)
    ????sched.add_job(my_func,run_date=alarm_time,?args=['ning'])
    ????sched.start()

    均可在redis中查詢到數(shù)據(jù).

    2.SQLAlchemy

    使用ORM框架,演示使用MySql

    from?apscheduler.schedulers.blocking?import?BlockingScheduler
    from?datetime?import?datetime,?timedelta

    def?my_func(t):
    ???print('hello?%s'?%t)

    if?__name__?==?"__main__":
    ???sched?=?BlockingScheduler()
    ???url?=?'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
    ???sched.add_jobstore('sqlalchemy',?url=url,tablename='api_job')

    ???alarm_time?=?datetime.now()?+?timedelta(seconds=300)
    ???sched.add_job(my_func,run_date=alarm_time,?args=['ning'])
    ???sched.start()

    如果表不存在,會自動創(chuàng)建表.tablename用于指定表的名稱.

    在數(shù)據(jù)庫中可以查看到表

    select?*?from?api_job;
    10336

    執(zhí)行器executors

    執(zhí)行器取決于應(yīng)用場景,默認(rèn)是ThreadPoolExecutor,它可以滿足大部分需求.如果是CPU密集型計算,可以選擇ProcessPoolExecutor

    class?apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)class?apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)
    #?max_worker?指定最多使用線程/進(jìn)程from?apscheduler.schedulers.background?import?BackgroundScheduler
    from?apscheduler.executors.pool?import?ThreadPoolExecutor

    executors?=?{
    ???'default':?ThreadPoolExecutor(20),
    }
    conf?=?{?#?redis配置
    ???"host":127.0.0.1,
    ???"port":6379,
    ???"db":15,?#?連接15號數(shù)據(jù)庫
    ???"max_connections":10?#?redis最大支持300個連接數(shù)
    }
    scheduler?=?BackgroundScheduler(executors=executors)
    scheduler.add_jobstore(jobstore='redis',?**conf)?#?添加任務(wù)持久化存儲方式,如果未安裝redis可省略此步驟

    任務(wù)操作

    • add_job(func, id='xxx', args=None, kwargs=None)添加任務(wù)
    #?添加任務(wù)func,?func參數(shù)可以使用?'可導(dǎo)入模塊:可調(diào)用對象'的方式引入,即可用模塊來引入
    #??tree?-L?2
    #├──?func
    #│???├──?add_func.py
    #│???├──?__init__.py
    #└──?jobs.py
    #?add_func.py
    def?add(x,y):
    ????print(x+y)
    ????
    #?jobs.py
    from?apscheduler.schedulers.blocking?import?BlockingScheduler
    from?func.add_func?import?add

    shced?=?BlockingScheduler()


    if?__name__?==?"__main__":
    ????shced.add_job('func.add_func:add',?args=[1,2],?id='job1')
    ????shced.start()???

    除去使用add_job(),還可以使用裝飾器函數(shù)scheduled_job來添加任務(wù).

    • remove_job(job_id):刪除任務(wù),需要指定job_id
    • pause_job(job_id):暫停任務(wù)
    • resume_job(job_id):恢復(fù)任務(wù)
    • modify_job(job_id, **changes):修改任務(wù)屬性
    • print_jobs():作業(yè)信息
    #?方法1
    job?=?scheduler.add_job(myfunc,?'interval',?minutes=2)??#?添加任務(wù)
    job.remove()??#?刪除任務(wù)
    job.pause()?#?暫定任務(wù)
    job.resume()??#?恢復(fù)任務(wù)

    #?方法2
    scheduler.add_job(myfunc,?'interval',?minutes=2,?id='my_job_id')??#?添加任務(wù)????
    scheduler.remove_job('my_job_id')??#?刪除任務(wù)
    scheduler.pause_job('my_job_id')??#?暫定任務(wù)
    scheduler.resume_job('my_job_id')??#?恢復(fù)任務(wù)

    示例

    方法1

    from?pytz?import?utc
    from?datetime?import?datetime
    from?apscheduler.schedulers.background?import?BackgroundScheduler
    from?apscheduler.jobstores.mongodb?import?MongoDBJobStore
    from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
    from?apscheduler.executors.pool?import?ThreadPoolExecutor,?ProcessPoolExecutor
    def?tick():
    ???print('Tick!?The?time?is:?%s'?%?datetime.now())
    #?選擇MongoDB作為任務(wù)存儲數(shù)據(jù)庫
    jobstores?=?{
    ???'mongo':?MongoDBJobStore(),
    ???'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    #?默認(rèn)使用線程池
    executors?=?{
    ???'default':?ThreadPoolExecutor(20),
    ???'processpool':?ProcessPoolExecutor(5)
    }
    #?默認(rèn)參數(shù)配置
    job_defaults?=?{
    ???'coalesce':?False,??#?積攢的任務(wù)是否只跑一次,是否合并所有錯過的Job
    ???'max_instances':?3,??#?默認(rèn)同一時刻只能有一個實例運行,通過max_instances=3修改為3個。
    ???'misfire_grace_time':?30??#?30秒的任務(wù)超時容錯
    }
    scheduler?=?BackgroundScheduler(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)
    scheduler.add_job(tick,?'interval',?seconds=3)
    scheduler.start()

    方法2

    from?apscheduler.schedulers.background?import?BackgroundScheduler
    #?The?"apscheduler."?prefix?is?hard?coded
    scheduler?=?BackgroundScheduler({
    ???'apscheduler.jobstores.mongo':?{
    ????????'type':?'mongodb'
    ???},
    ???'apscheduler.jobstores.default':?{
    ???????'type':?'sqlalchemy',
    ???????'url':?'sqlite:///jobs.sqlite'
    ???},
    ???'apscheduler.executors.default':?{
    ???????'class':?'apscheduler.executors.pool:ThreadPoolExecutor',
    ???????'max_workers':?'20'
    ???},
    ???'apscheduler.executors.processpool':?{
    ???????'type':?'processpool',
    ???????'max_workers':?'5'
    ???},
    ???'apscheduler.job_defaults.coalesce':?'false',
    ???'apscheduler.job_defaults.max_instances':?'3',
    ???'apscheduler.timezone':?'UTC',
    })

    方法3

    from?pytz?import?utc
    from?apscheduler.schedulers.background?import?BackgroundScheduler
    from?apscheduler.jobstores.sqlalchemy?import?SQLAlchemyJobStore
    from?apscheduler.executors.pool?import?ProcessPoolExecutor
    jobstores?=?{
    ???'mongo':?{'type':?'mongodb'},
    ???'default':?SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    executors?=?{
    ???'default':?{'type':?'threadpool',?'max_workers':?20},
    ???'processpool':?ProcessPoolExecutor(max_workers=5)
    }
    job_defaults?=?{
    ???'coalesce':?False,
    ???'max_instances':?3
    }
    scheduler?=?BackgroundScheduler()
    #?..這里可以添加任務(wù)
    scheduler.configure(jobstores=jobstores,?executors=executors,?job_defaults=job_defaults,?timezone=utc)

    misfire_grace_time:如果一個job本來14:00有一次執(zhí)行,但是由于某種原因沒有被調(diào)度上,現(xiàn)在14:01了,這個14:00的運行實例被提交時,會檢查它預(yù)訂運行的時間和當(dāng)下時間的差值(這里是1分鐘),大于我們設(shè)置的30秒限制,那么這個運行實例不會被執(zhí)行。合并:最常見的情形是scheduler被shutdown后重啟,某個任務(wù)會積攢了好幾次沒執(zhí)行如5次,下次這個job被submit給executor時,執(zhí)行5次。將coalesce=True后,只會執(zhí)行一次

    replace_existing: 如果在程序初始化時,是從數(shù)據(jù)庫讀取任務(wù)的,那么必須為每個任務(wù)定義一個明確的ID,并且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務(wù)拷貝,也就意味著任務(wù)的狀態(tài)不會保存。

    總結(jié)

    以上是生活随笔為你收集整理的容器中apscheduler不执行_APScheduler:定时任务框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。