日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

任务调度之Quartz2

發布時間:2024/4/13 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 任务调度之Quartz2 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Quartz 集成到Spring

Spring-quartz 工程

Spring 在spring-context-support.jar 中直接提供了對Quartz 的支持。

可以在配置文件中把JobDetail、Trigger、Scheduler 定義成Bean。

定義Job

<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><property name="name" value="my_job_1"/><property name="group" value="my_group"/><property name="jobClass" value="com.leon.quartz.MyJob1"/><property name="durability" value="true"/> </bean>

定義Trigger

<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean"><property name="name" value="my_trigger_1"/><property name="group" value="my_group"/><property name="jobDetail" ref="myJob1"/><property name="startDelay" value="1000"/><property name="repeatInterval" value="5000"/><property name="repeatCount" value="2"/> </bean>

定義Scheduler

<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="simpleTrigger"/><ref bean="cronTrigger"/></list></property> </bean>

既然可以在配置文件配置,當然也可以用@Bean 注解配置。在配置類上加上@Configuration 讓Spring 讀取到。

public class QuartzConfig {@Beanpublic JobDetail printTimeJobDetail(){return JobBuilder.newJob(MyJob1.class).withIdentity("leonJob").usingJobData("leon", "職位更好的你").storeDurably().build();}@Beanpublic Trigger printTimeJobTrigger() {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");return TriggerBuilder.newTrigger().forJob(printTimeJobDetail()).withIdentity("quartzTaskService").withSchedule(cronScheduleBuilder).build();} }

運行spring-quartz 工程的com.leon.quartz.QuartzTest

package com.leon.quartz;import org.quartz.*; import org.quartz.impl.StdScheduler; import org.quartz.impl.StdSchedulerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 單元測試類*/ public class QuartzTest {private static Scheduler scheduler;public static void main(String[] args) throws SchedulerException {// 獲取容器ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");// 從容器中獲取調度器scheduler = (StdScheduler) ac.getBean("scheduler");// 啟動調度器scheduler.start();}}

動態調度的實現

springboot-quartz 工程

傳統的Spring 方式集成,由于任務信息全部配置在xml 文件中,如果需要操作任務或者修改任務運行頻率,只能重新編譯、打包、部署、重啟,如果有緊急問題需要處理,會浪費很多的時間。

有沒有可以動態調度任務的方法?比如停止一個Job?啟動一個Job?修改Job 的觸發頻率?

讀取配置文件、寫入配置文件、重啟Scheduler 或重啟應用明顯是不可取的。

對于這種頻繁變更并且需要實時生效的配置信息,我們可以放到哪里?

ZK、Redis、DB tables。

并且,我們可以提供一個界面,實現對數據表的輕松操作。

配置管理

這里我們用最簡單的數據庫的實現。

問題1:建一張什么樣的表?參考JobDetail 的屬性。

CREATE TABLE `sys_job` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',`job_name` varchar(512) NOT NULL COMMENT '任務名稱',`job_group` varchar(512) NOT NULL COMMENT '任務組名',`job_cron` varchar(512) NOT NULL COMMENT '時間表達式',`job_class_path` varchar(1024) NOT NULL COMMENT '類路徑,全類型',`job_data_map` varchar(1024) DEFAULT NULL COMMENT '傳遞map 參數',`job_status` int(2) NOT NULL COMMENT '狀態:1 啟用0 停用',`job_describe` varchar(1024) DEFAULT NULL COMMENT '任務功能描述',PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

數據操作與任務調度

操作數據表非常簡單,SSM 增刪改查。

但是在修改了表的數據之后,怎么讓調度器知道呢?

調度器的接口:Scheduler

在我們的需求中,我們需要做的事情:

1、新增一個任務

2、刪除一個任務

3、啟動、停止一個任務

4、修改任務的信息(包括調度規律)

因此可以把相關的操作封裝到一個工具類中。com.leon.demo.util.SchedulerUtil

package com.leon.demo.util;import com.alibaba.fastjson.JSONObject; import com.leon.demo.config.MyJobFactory; import org.apache.commons.lang3.StringUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/*** Quartz工具類**/ public class SchedulerUtil {private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);/*** 新增定時任務* @param jobClassName 類路徑* @param jobName 任務名稱* @param jobGroupName 組別* @param cronExpression Cron表達式* @param jobDataMap 需要傳遞的參數* @throws Exception*/public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 啟動調度器scheduler.start();// 構建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass()).withIdentity(jobName, jobGroupName).build();// JobDataMap用于傳遞任務運行時的參數,比如定時發送郵件,可以用json形式存儲收件人等等信息if (StringUtils.isNotEmpty(jobDataMap)) {JSONObject jb = JSONObject.parseObject(jobDataMap);Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}// 表達式調度構建器(即任務執行的時間)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);// 按新的cronExpression表達式構建一個新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("創建定時任務失敗" + e);throw new Exception("創建定時任務失敗");}}/*** 停用一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobPause(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));}/*** 啟用一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobresume(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));}/*** 刪除一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobdelete(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));}/*** 更新定時任務表達式* @param jobName 任務名稱* @param jobGroupName 組別* @param cronExpression Cron表達式* @throws Exception*/public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {try {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);// 表達式調度構建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 按新的cronExpression表達式重新構建triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();// 按新的trigger重新設置job執行scheduler.rescheduleJob(triggerKey, trigger);} catch (SchedulerException e) {System.out.println("更新定時任務失敗" + e);throw new Exception("更新定時任務失敗");}}/*** 檢查Job是否存在* @throws Exception*/public static Boolean isResume(String jobName, String jobGroupName) throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);Boolean state = scheduler.checkExists(triggerKey);return state;}/*** 暫停所有任務* @throws Exception*/public static void pauseAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseAll();}/*** 喚醒所有任務* @throws Exception*/public static void resumeAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler sched = sf.getScheduler();sched.resumeAll();}/*** 獲取Job實例* @param classname* @return* @throws Exception*/public static BaseJob getClass(String classname) throws Exception {try {Class<?> c = Class.forName(classname);return (BaseJob) c.newInstance();} catch (Exception e) {throw new Exception("類["+classname+"]不存在!");}}}

容器啟動與Service 注入

容器啟動

因為任務沒有定義在ApplicationContext.xml 中,而是放到了數據庫中,SpringBoot 啟動時,怎么讀取任務信息?

或者,怎么在Spring 啟動完成的時候做一些事情?

創建一個類,實現CommandLineRunner 接口,實現run 方法。

從表中查出狀態是1 的任務,然后構建。

Service 類注入到Job 中

Spring Bean 如何注入到實現了Job 接口的類中?

例如在TestTask3 中,需要注入ISysJobService,查詢數據庫發送郵件。

如果沒有任何配置,注入會報空指針異常。

原因:

因為定時任務Job 對象的實例化過程是在Quartz 中進行的,而Service Bean 是由Spring 容器管理的,Quartz 察覺不到Service Bean 的存在,所以無法將Service Bean裝配到Job 對象中。

分析:

Quartz 集成到Spring 中,用到SchedulerFactoryBean,其實現了InitializingBean方法,在唯一的方法afterPropertiesSet()在Bean 的屬性初始化后調用。

調度器用AdaptableJobFactory 對Job 對象進行實例化。所以,如果我們可以把這個JobFactory 指定為我們自定義的工廠的話,就可以在Job 實例化完成之后,把Job納入到Spring 容器中管理。

解決這個問題的步驟:

1、定義一個AdaptableJobFactory,實現JobFactory 接口,實現接口定義的newJob 方法,在這里面返回Job 實例

2、定義一個MyJobFactory,繼承AdaptableJobFactory。使用Spring 的AutowireCapableBeanFactory,把Job 實例注入到容器中。

@Component public class MyJobFactory extends AdaptableJobFactory {@Autowiredprivate AutowireCapableBeanFactory capableBeanFactory;protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);capableBeanFactory.autowireBean(jobInstance);return jobInstance;} }

3、指定Scheduler 的JobFactory 為自定義的JobFactory。com.leon.demo.config.InitStartSchedule 中:

scheduler.setJobFactory(myJobFactory); package com.leon.demo.config;import com.alibaba.fastjson.JSONObject; import com.leon.demo.entity.SysJob; import com.leon.demo.service.ISysJobService; import com.leon.demo.util.BaseJob; import org.apache.commons.lang3.StringUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.List; import java.util.Map;/*** 這個類用于啟動SpringBoot時,加載作業。run方法會自動執行。** 另外可以使用 ApplicationRunner**/ @Component public class InitStartSchedule implements CommandLineRunner {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate ISysJobService sysJobService;@Autowiredprivate MyJobFactory myJobFactory;@Overridepublic void run(String... args) throws Exception {/*** 用于程序啟動時加載定時任務,并執行已啟動的定時任務(只會執行一次,在程序啟動完執行)*///查詢job狀態為啟用的HashMap<String,String> map = new HashMap<String,String>();map.put("jobStatus", "1");List<SysJob> jobList= sysJobService.querySysJobList(map);if( null == jobList || jobList.size() ==0){logger.info("系統啟動,沒有需要執行的任務... ...");}// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 如果不設置JobFactory,Service注入到Job會報空指針scheduler.setJobFactory(myJobFactory);// 啟動調度器scheduler.start();for (SysJob sysJob:jobList) {String jobClassName=sysJob.getJobName();String jobGroupName=sysJob.getJobGroup();//構建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}//表達式調度構建器(即任務執行的時間)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());//按新的cronExpression表達式構建一個新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();// 任務不存在的時候才添加if( !scheduler.checkExists(jobDetail.getKey()) ){try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("\n創建定時任務失敗"+e);throw new Exception("創建定時任務失敗");}}}}public static BaseJob getClass(String classname) throws Exception{Class<?> c= Class.forName(classname);return (BaseJob)c.newInstance();} }

考慮這么一種情況:

正在運行的Quartz 節點掛了,而所有人完全不知情……

Quartz 集群部署

springboot-quartz 工程

為什么需要集群?

1、防止單點故障,減少對業務的影響

2、減少節點的壓力,例如在10 點要觸發1000 個任務,如果有10 個節點,則每個節點之需要執行100 個任務

集群需要解決的問題?

1、任務重跑,因為節點部署的內容是一樣的,到10 點的時候,每個節點都會執行相同的操作,引起數據混亂。比如跑批,絕對不能執行多次。

2、任務漏跑,假如任務是平均分配的,本來應該在某個節點上執行的任務,因為節點故障,一直沒有得到執行。

3、水平集群需要注意時間同步問題

4、Quartz 使用的是隨機的負載均衡算法,不能指定節點執行

所以必須要有一種共享數據或者通信的機制。在分布式系統的不同節點中,我們可以采用什么樣的方式,實現數據共享?

兩兩通信,或者基于分布式的服務,實現數據共享。

例如:ZK、Redis、DB。

在Quartz 中,提供了一種簡單的方式,基于數據庫共享任務執行信息。也就是說,一個節點執行任務的時候,會操作數據庫,其他的節點查詢數據庫,便可以感知到了。

同樣的問題:建什么表?哪些字段?依舊使用系統自帶的11 張表。

集群配置與驗證

quartz.properties 配置。

四個配置:集群實例ID、集群開關、數據庫持久化、數據源信息

注意先清空quartz 所有表、改端口、兩個任務頻率改成一樣

驗證1:先后啟動2 個節點,任務是否重跑

驗證2:停掉一個節點,任務是否漏跑

Quartz 調度原理

問題:

1、Job 沒有繼承Thread 和實現Runnable,是怎么被調用的?通過反射還是什么?

2、任務是什么時候被調度的?是誰在監視任務還是監視Trigger?

3、任務是怎么被調用的?誰執行了任務?

4、任務本身有狀態嗎?還是觸發器有狀態?

看源碼的入口

Scheduler scheduler = factory.getScheduler(); scheduler.scheduleJob(jobDetail, trigger); scheduler.start();

獲取調度器實例

讀取配置文件

public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {// 讀取quartz.properties 配置文件initialize();}// 這個類是一個HashMap,用來基于調度器的名稱保證調度器的唯一性SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());// 如果調度器已經存在了if (sched != null) {// 調度器關閉了,移除if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {// 返回調度器return sched;}}// 調度器不存在,初始化sched = instantiate();return sched; }

instantiate()方法中做了初始化的所有工作:

// 存儲任務信息的JobStore JobStore js = null; // 創建線程池,默認是SimpleThreadPool ThreadPool tp = null; // 創建調度器 QuartzScheduler qs = null; // 連接數據庫的連接管理器 DBConnectionManager dbMgr = null; // 自動生成ID // 創建線程執行器,默認為DefaultThreadExecutor ThreadExecutor threadExecutor;

創建線程池(包工頭)

830 行和839 行,創建了一個線程池,默認是配置文件中指定的SimpleThreadPool。

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

SimpleThreadPool 里面維護了三個list,分別存放所有的工作線程、空閑的工作線程和忙碌的工作線程。我們可以把SimpleThreadPool 理解為包工頭。

private List<WorkerThread> workers; private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

tp 的runInThread()方法是線程池運行線程的接口方法。參數Runnable 是執行的任務內容。

取出WorkerThread 去執行參數里面的runnable(JobRunShell)。

WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable);

WorkerThread(工人)

WorkerThread 是SimpleThreadPool 的內部類, 用來執行任務。我們把WorkerThread 理解為工人。在WorkerThread 的run 方法中,執行傳入的參數runnable任務:

runnable.run();

創建調度線程(項目經理)

1321 行,創建了調度器QuartzScheduler:

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在QuartzScheduler 的構造函數中,創建了QuartzSchedulerThread,我們把它理解為項目經理,它會調用包工頭的工人資源,給他們安排任務。

并且創建了線程執行器schedThreadExecutor , 執行了這個QuartzSchedulerThread,也就是調用了它的run 方法。

// 創建一個線程,resouces 里面有線程名稱 this.schedThread = new QuartzSchedulerThread(this, resources); // 線程執行器 ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); //執行這個線程,也就是調用了線程的run 方法 schedThreadExecutor.execute(this.schedThread);

點開QuartzSchedulerThread 類,找到run 方法,這個是Quartz 任務調度的核心方法:

public void run() {int acquiresFailed = 0;// 檢查scheuler 是否為停止狀態while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {// 檢查是否為暫停狀態while (paused && !halted.get()) {try {// wait until togglePause(false) is called...// 暫停的話會嘗試去獲得信號鎖,并wait 一會sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..// 從JobStore 獲取Job 持續失敗,sleep 一下if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}// 從線程池獲取可用的線程int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {// 獲取需要下次執行的triggers// idleWaitTime: 默認30s// availThreadCount:獲取可用(空閑)的工作線程數量,總會大于1,因為該方法會一直阻塞,直到有工作線程空閑下來。// maxBatchSize:一次拉取trigger 的最大數量,默認是1// batchTimeWindow:時間窗口調節參數,默認是0// misfireThreshold: 超過這個時間還未觸發的trigger,被認為發生了misfire,默認60s// 調度線程一次會拉取NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個triggers,默認情況下,會拉取未來30s、過去60s 之間還未fire 的1 個triggertriggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());// 省略…………// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {// 觸發Trigger,把ACQUIRED 狀態改成EXECUTING// 如果這個trigger 的NEXTFIRETIME 為空,也就是未來不再觸發,就將其狀態改為COMPLETE// 如果trigger 不允許并發執行(即Job 的實現類標注了@DisallowConcurrentExecution),則將狀態變為BLOCKED,否則就將狀態改為WAITINGList<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);// 省略…………continue;}}// 循環處理Triggerfor (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();// 省略…………JobRunShell shell = null;try {// 根據trigger 信息實例化JobRunShell(implements Runnable),同時依據JOB_CLASS_NAME 實例化Job,隨后我們將JobRunShell 實例丟入工作線。shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}// 執行JobRunShell 的run 方法if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// 省略…………

JobRunShell 的作用

JobRunShell instances are responsible for providing the 'safe' environment for Job s to run in, and for performing all of
the work of executing the Job, catching ANY thrown exceptions, updating the Trigger with the Job's completion code, etc.

A JobRunShell instance is created by a JobRunShellFactory on behalf of the QuartzSchedulerThread which then runs the
shell in a thread from the configured ThreadPool when the scheduler determines that a Job has been triggered.

JobRunShell 用來為Job 提供安全的運行環境的,執行Job 中所有的作業,捕獲運行中的異常,在任務執行完畢的時候更新Trigger 狀態,等等

JobRunShell 實例是用JobRunShellFactory 為QuartzSchedulerThread 創建的,在調度器決定一個Job 被觸發的時候,它從線程池中取出一個線程來執行任務。

線程模型總結

SimpleThreadPool:包工頭,管理所有WorkerThread

WorkerThread:工人,把Job 包裝成JobRunShell,執行

QuartSchedulerThread:項目經理,獲取即將觸發的Trigger,從包工頭出拿到worker,執行Trigger 綁定的任務

綁定JobDetail 和Trigger

// 存儲JobDetail 和Trigger resources.getJobStore().storeJobAndTrigger(jobDetail, trig); // 通知相關的Listener notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger);

啟動調度器

// 通知監聽器 notifySchedulerListenersStarting(); if (initialStart == null) {initialStart = new Date();this.resources.getJobStore().schedulerStarted();startPlugins(); } else {resources.getJobStore().schedulerResumed(); } // 通知QuartzSchedulerThread 不再等待,開始干活 schedThread.togglePause(false); // 通知監聽器 notifySchedulerListenersStarted();

源碼總結

getScheduler 方法創建線程池ThreadPool,創建調度器QuartzScheduler,創建調度線程QuartzSchedulerThread,調度線程初始處于暫停狀態。

scheduleJob 將任務添加到JobStore 中。

scheduler.start()方法激活調度器,QuartzSchedulerThread 從timeTrriger 取出待觸發的任務, 并包裝成TriggerFiredBundle , 然后由JobRunShellFactory 創建TriggerFiredBundle 的執行線程JobRunShell , 調度執行通過線程池SimpleThreadPool 去執行JobRunShell,而JobRunShell 執行的就是任務類的execute方法:job.execute(JobExecutionContext context)。

集群原理

基于數據庫,如何實現任務的不重跑不漏跑?

問題1:如果任務執行中的資源是“下一個即將觸發的任務”,怎么基于數據庫實現這個資源的競爭?

問題2:怎么對數據的行加鎖?

QuartzSchedulerThread 第287 行,獲取下一個即將觸發的Trigger

triggers = qsRsrcs.getJobStore().acquireNextTriggers(

調用JobStoreSupport 的acquireNextTriggers()方法,2793 行

調用JobStoreSupport.executeInNonManagedTXLock()方法,3829 行:

return executeInNonManagedTXLock(lockName,

嘗試獲取鎖,3843 行:

transOwner = getLockHandler().obtainLock(conn, lockName);

下面有回滾和釋放鎖的語句,即使發生異常,鎖同樣能釋放。

調用DBSemaphore 的obtainLock()方法,103 行

public boolean obtainLock(Connection conn, String lockName)throws LockException {if (!isLockOwner(lockName)) {executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);

調用StdRowLockSemaphore 的executeSQL()方法,78 行。

最終用JDBC 執行SQL,語句內容是expandedSQL 和expandedInsertSQL

ps = conn.prepareStatement(expandedSQL);

問題:expandedSQL 和expandedInsertSQL 是一條什么SQL 語句?似乎我們沒有賦值?

在StdRowLockSemaphore 的構造函數中,把定義的兩條SQL 傳進去

public StdRowLockSemaphore() {super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK); } public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("+ SCHED_NAME_SUBST + ", ?)";

它調用了父類DBSemaphore 的構造函數:

public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {this.tablePrefix = tablePrefix;this.schedName = schedName;setSQL(defaultSQL);setInsertSQL(defaultInsertSQL); }

在setSQL()和setInsertSQL()中為expandedSQL 和expandedInsertSQL 賦值。

執行的SQL 語句:

select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update

在我們執行官方的建表腳本的時候,QRTZ_LOCKS 表,它會為每個調度器創建兩行數據,獲取Trigger 和觸發Trigger 是兩把鎖:

任務為什么重復執行

在我們的演示過程中,有多個調度器,任務沒有重復執行,也就是默認會加鎖,什么情況下不會上鎖呢?

JobStoreSupport 的executeInNonManagedTXLock()方法

如果lockName 為空,則不上鎖

if (lockName != null) {// If we aren't using db locks, then delay getting DB connection// until after acquiring the lock since it isn't needed.if (getLockHandler().requiresConnection()) {conn = getNonManagedTXConnection();}transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) {conn = getNonManagedTXConnection(); }

而上一步JobStoreSupport 的acquireNextTriggers()方法,

1 ) 如果acquireTriggersWithinLock=true 或者batchTriggerAcquisitionMaxCount>1 時, locaName 賦值為LOCK_TRIGGER_ACCESS,此時獲取Trigger 會加鎖。

2)否則,如果isAcquireTriggersWithinLock()值是false 并且maxCount=1 的話,lockName 賦值為null,這種情況獲取Trigger 下不加鎖。

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {String lockName;if(isAcquireTriggersWithinLock() || maxCount > 1) {lockName = LOCK_TRIGGER_ACCESS;} else {lockName = null;}

acquireTriggersWithinLock 變量默認是false:

private boolean acquireTriggersWithinLock = false;

maxCount 來自QuartzSchedulerThread:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());

getMaxBatchSize()來自QuartzSchedulerResources,代表Scheduler 一次拉取trigger 的最大數量,默認是1:

private int maxBatchSize = 1;

這個值可以通過參數修改,代表允許調度程序節點一次獲取(用于觸發)的觸發器的最大數量,默認值是1。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1

根據以上兩個默認值,理論上在獲取Trigger 的時候不會上鎖,但是實際上為什么沒有出現頻繁的重復執行問題?因為每個調度器的線程持有鎖的時間太短了,單機的測試無法體現,而在高并發的情況下,有可能會出現這個問題。

QuartzSchedulerThread 的triggersFired()方法:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

調用了JobStoreSupport 的triggersFired()方法,接著又調用了一個triggerFiredtriggerFired(Connection conn, OperableTrigger trigger)方法:

如果Trigger 的狀態不是ACQUIRED,也就是說被其他的線程fire 了,返回空。但是這種樂觀鎖的檢查在高并發下難免會出現ABA 的問題,比如線程A 拿到的時候還是ACQUIRED 狀態,但是剛準備執行的時候已經變成了EXECUTING 狀態,這個時候就會出現重復執行的問題。

if (!state.equals(STATE_ACQUIRED)) {return null; }

總結,如果:

如果設置的數量為1(默認值),并且使用JDBC JobStore(RAMJobStore 不支持分布式, 只有一個調度器實例, 所以不加鎖) , 則屬性org.quartz.jobStore.acquireTriggersWithinLock 應設置為true。否則不加鎖可能會導致任務重復執行。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1 org.quartz.jobStore.acquireTriggersWithinLock=true

?

總結

以上是生活随笔為你收集整理的任务调度之Quartz2的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。