任务调度之Quartz2
Quartz 集成到Spring
Spring-quartz 工程
Spring 在spring-context-support.jar 中直接提供了對Quartz 的支持。
可以在配置文件中把JobDetail、Trigger、Scheduler 定義成Bean。
定義Job
<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><property name="name" value="my_job_1"/><property name="group" value="my_group"/><property name="jobClass" value="com.leon.quartz.MyJob1"/><property name="durability" value="true"/> </bean>定義Trigger
<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean"><property name="name" value="my_trigger_1"/><property name="group" value="my_group"/><property name="jobDetail" ref="myJob1"/><property name="startDelay" value="1000"/><property name="repeatInterval" value="5000"/><property name="repeatCount" value="2"/> </bean>定義Scheduler
<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="simpleTrigger"/><ref bean="cronTrigger"/></list></property> </bean>既然可以在配置文件配置,當然也可以用@Bean 注解配置。在配置類上加上@Configuration 讓Spring 讀取到。
public class QuartzConfig {@Beanpublic JobDetail printTimeJobDetail(){return JobBuilder.newJob(MyJob1.class).withIdentity("leonJob").usingJobData("leon", "職位更好的你").storeDurably().build();}@Beanpublic Trigger printTimeJobTrigger() {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");return TriggerBuilder.newTrigger().forJob(printTimeJobDetail()).withIdentity("quartzTaskService").withSchedule(cronScheduleBuilder).build();} }運行spring-quartz 工程的com.leon.quartz.QuartzTest
package com.leon.quartz;import org.quartz.*; import org.quartz.impl.StdScheduler; import org.quartz.impl.StdSchedulerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 單元測試類*/ public class QuartzTest {private static Scheduler scheduler;public static void main(String[] args) throws SchedulerException {// 獲取容器ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");// 從容器中獲取調度器scheduler = (StdScheduler) ac.getBean("scheduler");// 啟動調度器scheduler.start();}}動態調度的實現
springboot-quartz 工程
傳統的Spring 方式集成,由于任務信息全部配置在xml 文件中,如果需要操作任務或者修改任務運行頻率,只能重新編譯、打包、部署、重啟,如果有緊急問題需要處理,會浪費很多的時間。
有沒有可以動態調度任務的方法?比如停止一個Job?啟動一個Job?修改Job 的觸發頻率?
讀取配置文件、寫入配置文件、重啟Scheduler 或重啟應用明顯是不可取的。
對于這種頻繁變更并且需要實時生效的配置信息,我們可以放到哪里?
ZK、Redis、DB tables。
并且,我們可以提供一個界面,實現對數據表的輕松操作。
配置管理
這里我們用最簡單的數據庫的實現。
問題1:建一張什么樣的表?參考JobDetail 的屬性。
CREATE TABLE `sys_job` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',`job_name` varchar(512) NOT NULL COMMENT '任務名稱',`job_group` varchar(512) NOT NULL COMMENT '任務組名',`job_cron` varchar(512) NOT NULL COMMENT '時間表達式',`job_class_path` varchar(1024) NOT NULL COMMENT '類路徑,全類型',`job_data_map` varchar(1024) DEFAULT NULL COMMENT '傳遞map 參數',`job_status` int(2) NOT NULL COMMENT '狀態:1 啟用0 停用',`job_describe` varchar(1024) DEFAULT NULL COMMENT '任務功能描述',PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;數據操作與任務調度
操作數據表非常簡單,SSM 增刪改查。
但是在修改了表的數據之后,怎么讓調度器知道呢?
調度器的接口:Scheduler
在我們的需求中,我們需要做的事情:
1、新增一個任務
2、刪除一個任務
3、啟動、停止一個任務
4、修改任務的信息(包括調度規律)
因此可以把相關的操作封裝到一個工具類中。com.leon.demo.util.SchedulerUtil
package com.leon.demo.util;import com.alibaba.fastjson.JSONObject; import com.leon.demo.config.MyJobFactory; import org.apache.commons.lang3.StringUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;import java.util.Map;/*** Quartz工具類**/ public class SchedulerUtil {private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);/*** 新增定時任務* @param jobClassName 類路徑* @param jobName 任務名稱* @param jobGroupName 組別* @param cronExpression Cron表達式* @param jobDataMap 需要傳遞的參數* @throws Exception*/public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 啟動調度器scheduler.start();// 構建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass()).withIdentity(jobName, jobGroupName).build();// JobDataMap用于傳遞任務運行時的參數,比如定時發送郵件,可以用json形式存儲收件人等等信息if (StringUtils.isNotEmpty(jobDataMap)) {JSONObject jb = JSONObject.parseObject(jobDataMap);Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}// 表達式調度構建器(即任務執行的時間)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);// 按新的cronExpression表達式構建一個新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("創建定時任務失敗" + e);throw new Exception("創建定時任務失敗");}}/*** 停用一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobPause(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));}/*** 啟用一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobresume(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));}/*** 刪除一個定時任務* @param jobName 任務名稱* @param jobGroupName 組別* @throws Exception*/public static void jobdelete(String jobName, String jobGroupName) throws Exception {// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));}/*** 更新定時任務表達式* @param jobName 任務名稱* @param jobGroupName 組別* @param cronExpression Cron表達式* @throws Exception*/public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {try {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);// 表達式調度構建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 按新的cronExpression表達式重新構建triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();// 按新的trigger重新設置job執行scheduler.rescheduleJob(triggerKey, trigger);} catch (SchedulerException e) {System.out.println("更新定時任務失敗" + e);throw new Exception("更新定時任務失敗");}}/*** 檢查Job是否存在* @throws Exception*/public static Boolean isResume(String jobName, String jobGroupName) throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);Boolean state = scheduler.checkExists(triggerKey);return state;}/*** 暫停所有任務* @throws Exception*/public static void pauseAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();scheduler.pauseAll();}/*** 喚醒所有任務* @throws Exception*/public static void resumeAlljob() throws Exception {SchedulerFactory sf = new StdSchedulerFactory();Scheduler sched = sf.getScheduler();sched.resumeAll();}/*** 獲取Job實例* @param classname* @return* @throws Exception*/public static BaseJob getClass(String classname) throws Exception {try {Class<?> c = Class.forName(classname);return (BaseJob) c.newInstance();} catch (Exception e) {throw new Exception("類["+classname+"]不存在!");}}}容器啟動與Service 注入
容器啟動
因為任務沒有定義在ApplicationContext.xml 中,而是放到了數據庫中,SpringBoot 啟動時,怎么讀取任務信息?
或者,怎么在Spring 啟動完成的時候做一些事情?
創建一個類,實現CommandLineRunner 接口,實現run 方法。
從表中查出狀態是1 的任務,然后構建。
Service 類注入到Job 中
Spring Bean 如何注入到實現了Job 接口的類中?
例如在TestTask3 中,需要注入ISysJobService,查詢數據庫發送郵件。
如果沒有任何配置,注入會報空指針異常。
原因:
因為定時任務Job 對象的實例化過程是在Quartz 中進行的,而Service Bean 是由Spring 容器管理的,Quartz 察覺不到Service Bean 的存在,所以無法將Service Bean裝配到Job 對象中。
分析:
Quartz 集成到Spring 中,用到SchedulerFactoryBean,其實現了InitializingBean方法,在唯一的方法afterPropertiesSet()在Bean 的屬性初始化后調用。
調度器用AdaptableJobFactory 對Job 對象進行實例化。所以,如果我們可以把這個JobFactory 指定為我們自定義的工廠的話,就可以在Job 實例化完成之后,把Job納入到Spring 容器中管理。
解決這個問題的步驟:
1、定義一個AdaptableJobFactory,實現JobFactory 接口,實現接口定義的newJob 方法,在這里面返回Job 實例
2、定義一個MyJobFactory,繼承AdaptableJobFactory。使用Spring 的AutowireCapableBeanFactory,把Job 實例注入到容器中。
@Component public class MyJobFactory extends AdaptableJobFactory {@Autowiredprivate AutowireCapableBeanFactory capableBeanFactory;protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {Object jobInstance = super.createJobInstance(bundle);capableBeanFactory.autowireBean(jobInstance);return jobInstance;} }3、指定Scheduler 的JobFactory 為自定義的JobFactory。com.leon.demo.config.InitStartSchedule 中:
scheduler.setJobFactory(myJobFactory); package com.leon.demo.config;import com.alibaba.fastjson.JSONObject; import com.leon.demo.entity.SysJob; import com.leon.demo.service.ISysJobService; import com.leon.demo.util.BaseJob; import org.apache.commons.lang3.StringUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.List; import java.util.Map;/*** 這個類用于啟動SpringBoot時,加載作業。run方法會自動執行。** 另外可以使用 ApplicationRunner**/ @Component public class InitStartSchedule implements CommandLineRunner {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate ISysJobService sysJobService;@Autowiredprivate MyJobFactory myJobFactory;@Overridepublic void run(String... args) throws Exception {/*** 用于程序啟動時加載定時任務,并執行已啟動的定時任務(只會執行一次,在程序啟動完執行)*///查詢job狀態為啟用的HashMap<String,String> map = new HashMap<String,String>();map.put("jobStatus", "1");List<SysJob> jobList= sysJobService.querySysJobList(map);if( null == jobList || jobList.size() ==0){logger.info("系統啟動,沒有需要執行的任務... ...");}// 通過SchedulerFactory獲取一個調度器實例SchedulerFactory sf = new StdSchedulerFactory();Scheduler scheduler = sf.getScheduler();// 如果不設置JobFactory,Service注入到Job會報空指針scheduler.setJobFactory(myJobFactory);// 啟動調度器scheduler.start();for (SysJob sysJob:jobList) {String jobClassName=sysJob.getJobName();String jobGroupName=sysJob.getJobGroup();//構建job信息JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");for (Map.Entry<String, Object> m:dataMap.entrySet()) {jobDetail.getJobDataMap().put(m.getKey(),m.getValue());}}//表達式調度構建器(即任務執行的時間)CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());//按新的cronExpression表達式構建一個新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName).withSchedule(scheduleBuilder).startNow().build();// 任務不存在的時候才添加if( !scheduler.checkExists(jobDetail.getKey()) ){try {scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {logger.info("\n創建定時任務失敗"+e);throw new Exception("創建定時任務失敗");}}}}public static BaseJob getClass(String classname) throws Exception{Class<?> c= Class.forName(classname);return (BaseJob)c.newInstance();} }考慮這么一種情況:
正在運行的Quartz 節點掛了,而所有人完全不知情……
Quartz 集群部署
springboot-quartz 工程
為什么需要集群?
1、防止單點故障,減少對業務的影響
2、減少節點的壓力,例如在10 點要觸發1000 個任務,如果有10 個節點,則每個節點之需要執行100 個任務
集群需要解決的問題?
1、任務重跑,因為節點部署的內容是一樣的,到10 點的時候,每個節點都會執行相同的操作,引起數據混亂。比如跑批,絕對不能執行多次。
2、任務漏跑,假如任務是平均分配的,本來應該在某個節點上執行的任務,因為節點故障,一直沒有得到執行。
3、水平集群需要注意時間同步問題
4、Quartz 使用的是隨機的負載均衡算法,不能指定節點執行
所以必須要有一種共享數據或者通信的機制。在分布式系統的不同節點中,我們可以采用什么樣的方式,實現數據共享?
兩兩通信,或者基于分布式的服務,實現數據共享。
例如:ZK、Redis、DB。
在Quartz 中,提供了一種簡單的方式,基于數據庫共享任務執行信息。也就是說,一個節點執行任務的時候,會操作數據庫,其他的節點查詢數據庫,便可以感知到了。
同樣的問題:建什么表?哪些字段?依舊使用系統自帶的11 張表。
集群配置與驗證
quartz.properties 配置。
四個配置:集群實例ID、集群開關、數據庫持久化、數據源信息
注意先清空quartz 所有表、改端口、兩個任務頻率改成一樣
驗證1:先后啟動2 個節點,任務是否重跑
驗證2:停掉一個節點,任務是否漏跑
Quartz 調度原理
問題:
1、Job 沒有繼承Thread 和實現Runnable,是怎么被調用的?通過反射還是什么?
2、任務是什么時候被調度的?是誰在監視任務還是監視Trigger?
3、任務是怎么被調用的?誰執行了任務?
4、任務本身有狀態嗎?還是觸發器有狀態?
看源碼的入口
Scheduler scheduler = factory.getScheduler(); scheduler.scheduleJob(jobDetail, trigger); scheduler.start();獲取調度器實例
讀取配置文件
public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {// 讀取quartz.properties 配置文件initialize();}// 這個類是一個HashMap,用來基于調度器的名稱保證調度器的唯一性SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());// 如果調度器已經存在了if (sched != null) {// 調度器關閉了,移除if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {// 返回調度器return sched;}}// 調度器不存在,初始化sched = instantiate();return sched; }instantiate()方法中做了初始化的所有工作:
// 存儲任務信息的JobStore JobStore js = null; // 創建線程池,默認是SimpleThreadPool ThreadPool tp = null; // 創建調度器 QuartzScheduler qs = null; // 連接數據庫的連接管理器 DBConnectionManager dbMgr = null; // 自動生成ID // 創建線程執行器,默認為DefaultThreadExecutor ThreadExecutor threadExecutor;創建線程池(包工頭)
830 行和839 行,創建了一個線程池,默認是配置文件中指定的SimpleThreadPool。
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();SimpleThreadPool 里面維護了三個list,分別存放所有的工作線程、空閑的工作線程和忙碌的工作線程。我們可以把SimpleThreadPool 理解為包工頭。
private List<WorkerThread> workers; private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();tp 的runInThread()方法是線程池運行線程的接口方法。參數Runnable 是執行的任務內容。
取出WorkerThread 去執行參數里面的runnable(JobRunShell)。
WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable);WorkerThread(工人)
WorkerThread 是SimpleThreadPool 的內部類, 用來執行任務。我們把WorkerThread 理解為工人。在WorkerThread 的run 方法中,執行傳入的參數runnable任務:
runnable.run();創建調度線程(項目經理)
1321 行,創建了調度器QuartzScheduler:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);在QuartzScheduler 的構造函數中,創建了QuartzSchedulerThread,我們把它理解為項目經理,它會調用包工頭的工人資源,給他們安排任務。
并且創建了線程執行器schedThreadExecutor , 執行了這個QuartzSchedulerThread,也就是調用了它的run 方法。
// 創建一個線程,resouces 里面有線程名稱 this.schedThread = new QuartzSchedulerThread(this, resources); // 線程執行器 ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); //執行這個線程,也就是調用了線程的run 方法 schedThreadExecutor.execute(this.schedThread);點開QuartzSchedulerThread 類,找到run 方法,這個是Quartz 任務調度的核心方法:
public void run() {int acquiresFailed = 0;// 檢查scheuler 是否為停止狀態while (!halted.get()) {try {// check if we're supposed to pause...synchronized (sigLock) {// 檢查是否為暫停狀態while (paused && !halted.get()) {try {// wait until togglePause(false) is called...// 暫停的話會嘗試去獲得信號鎖,并wait 一會sigLock.wait(1000L);} catch (InterruptedException ignore) {}// reset failure counter when paused, so that we don't// wait again after unpausingacquiresFailed = 0;}if (halted.get()) {break;}}// wait a bit, if reading from job store is consistently// failing (e.g. DB is down or restarting)..// 從JobStore 獲取Job 持續失敗,sleep 一下if (acquiresFailed > 1) {try {long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);Thread.sleep(delay);} catch (Exception ignore) {}}// 從線程池獲取可用的線程int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...List<OperableTrigger> triggers;long now = System.currentTimeMillis();clearSignaledSchedulingChange();try {// 獲取需要下次執行的triggers// idleWaitTime: 默認30s// availThreadCount:獲取可用(空閑)的工作線程數量,總會大于1,因為該方法會一直阻塞,直到有工作線程空閑下來。// maxBatchSize:一次拉取trigger 的最大數量,默認是1// batchTimeWindow:時間窗口調節參數,默認是0// misfireThreshold: 超過這個時間還未觸發的trigger,被認為發生了misfire,默認60s// 調度線程一次會拉取NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個triggers,默認情況下,會拉取未來30s、過去60s 之間還未fire 的1 個triggertriggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());// 省略…………// set triggers to 'executing'List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();boolean goAhead = true;synchronized(sigLock) {goAhead = !halted.get();}if(goAhead) {try {// 觸發Trigger,把ACQUIRED 狀態改成EXECUTING// 如果這個trigger 的NEXTFIRETIME 為空,也就是未來不再觸發,就將其狀態改為COMPLETE// 如果trigger 不允許并發執行(即Job 的實現類標注了@DisallowConcurrentExecution),則將狀態變為BLOCKED,否則就將狀態改為WAITINGList<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);// 省略…………continue;}}// 循環處理Triggerfor (int i = 0; i < bndles.size(); i++) {TriggerFiredResult result = bndles.get(i);TriggerFiredBundle bndle = result.getTriggerFiredBundle();Exception exception = result.getException();// 省略…………JobRunShell shell = null;try {// 根據trigger 信息實例化JobRunShell(implements Runnable),同時依據JOB_CLASS_NAME 實例化Job,隨后我們將JobRunShell 實例丟入工作線。shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);shell.initialize(qs);} catch (SchedulerException se) {qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);continue;}// 執行JobRunShell 的run 方法if (qsRsrcs.getThreadPool().runInThread(shell) == false) {// 省略…………JobRunShell 的作用
JobRunShell instances are responsible for providing the 'safe' environment for Job s to run in, and for performing all of
the work of executing the Job, catching ANY thrown exceptions, updating the Trigger with the Job's completion code, etc.
A JobRunShell instance is created by a JobRunShellFactory on behalf of the QuartzSchedulerThread which then runs the
shell in a thread from the configured ThreadPool when the scheduler determines that a Job has been triggered.
JobRunShell 用來為Job 提供安全的運行環境的,執行Job 中所有的作業,捕獲運行中的異常,在任務執行完畢的時候更新Trigger 狀態,等等
JobRunShell 實例是用JobRunShellFactory 為QuartzSchedulerThread 創建的,在調度器決定一個Job 被觸發的時候,它從線程池中取出一個線程來執行任務。
線程模型總結
SimpleThreadPool:包工頭,管理所有WorkerThread
WorkerThread:工人,把Job 包裝成JobRunShell,執行
QuartSchedulerThread:項目經理,獲取即將觸發的Trigger,從包工頭出拿到worker,執行Trigger 綁定的任務
綁定JobDetail 和Trigger
// 存儲JobDetail 和Trigger resources.getJobStore().storeJobAndTrigger(jobDetail, trig); // 通知相關的Listener notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger);啟動調度器
// 通知監聽器 notifySchedulerListenersStarting(); if (initialStart == null) {initialStart = new Date();this.resources.getJobStore().schedulerStarted();startPlugins(); } else {resources.getJobStore().schedulerResumed(); } // 通知QuartzSchedulerThread 不再等待,開始干活 schedThread.togglePause(false); // 通知監聽器 notifySchedulerListenersStarted();源碼總結
getScheduler 方法創建線程池ThreadPool,創建調度器QuartzScheduler,創建調度線程QuartzSchedulerThread,調度線程初始處于暫停狀態。
scheduleJob 將任務添加到JobStore 中。
scheduler.start()方法激活調度器,QuartzSchedulerThread 從timeTrriger 取出待觸發的任務, 并包裝成TriggerFiredBundle , 然后由JobRunShellFactory 創建TriggerFiredBundle 的執行線程JobRunShell , 調度執行通過線程池SimpleThreadPool 去執行JobRunShell,而JobRunShell 執行的就是任務類的execute方法:job.execute(JobExecutionContext context)。
集群原理
基于數據庫,如何實現任務的不重跑不漏跑?
問題1:如果任務執行中的資源是“下一個即將觸發的任務”,怎么基于數據庫實現這個資源的競爭?
問題2:怎么對數據的行加鎖?
QuartzSchedulerThread 第287 行,獲取下一個即將觸發的Trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(調用JobStoreSupport 的acquireNextTriggers()方法,2793 行
調用JobStoreSupport.executeInNonManagedTXLock()方法,3829 行:
return executeInNonManagedTXLock(lockName,嘗試獲取鎖,3843 行:
transOwner = getLockHandler().obtainLock(conn, lockName);下面有回滾和釋放鎖的語句,即使發生異常,鎖同樣能釋放。
調用DBSemaphore 的obtainLock()方法,103 行
public boolean obtainLock(Connection conn, String lockName)throws LockException {if (!isLockOwner(lockName)) {executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);調用StdRowLockSemaphore 的executeSQL()方法,78 行。
最終用JDBC 執行SQL,語句內容是expandedSQL 和expandedInsertSQL
ps = conn.prepareStatement(expandedSQL);問題:expandedSQL 和expandedInsertSQL 是一條什么SQL 語句?似乎我們沒有賦值?
在StdRowLockSemaphore 的構造函數中,把定義的兩條SQL 傳進去
public StdRowLockSemaphore() {super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK); } public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("+ SCHED_NAME_SUBST + ", ?)";它調用了父類DBSemaphore 的構造函數:
public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {this.tablePrefix = tablePrefix;this.schedName = schedName;setSQL(defaultSQL);setInsertSQL(defaultInsertSQL); }在setSQL()和setInsertSQL()中為expandedSQL 和expandedInsertSQL 賦值。
執行的SQL 語句:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update在我們執行官方的建表腳本的時候,QRTZ_LOCKS 表,它會為每個調度器創建兩行數據,獲取Trigger 和觸發Trigger 是兩把鎖:
任務為什么重復執行
在我們的演示過程中,有多個調度器,任務沒有重復執行,也就是默認會加鎖,什么情況下不會上鎖呢?
JobStoreSupport 的executeInNonManagedTXLock()方法
如果lockName 為空,則不上鎖
if (lockName != null) {// If we aren't using db locks, then delay getting DB connection// until after acquiring the lock since it isn't needed.if (getLockHandler().requiresConnection()) {conn = getNonManagedTXConnection();}transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) {conn = getNonManagedTXConnection(); }而上一步JobStoreSupport 的acquireNextTriggers()方法,
1 ) 如果acquireTriggersWithinLock=true 或者batchTriggerAcquisitionMaxCount>1 時, locaName 賦值為LOCK_TRIGGER_ACCESS,此時獲取Trigger 會加鎖。
2)否則,如果isAcquireTriggersWithinLock()值是false 并且maxCount=1 的話,lockName 賦值為null,這種情況獲取Trigger 下不加鎖。
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {String lockName;if(isAcquireTriggersWithinLock() || maxCount > 1) {lockName = LOCK_TRIGGER_ACCESS;} else {lockName = null;}acquireTriggersWithinLock 變量默認是false:
private boolean acquireTriggersWithinLock = false;maxCount 來自QuartzSchedulerThread:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());getMaxBatchSize()來自QuartzSchedulerResources,代表Scheduler 一次拉取trigger 的最大數量,默認是1:
private int maxBatchSize = 1;這個值可以通過參數修改,代表允許調度程序節點一次獲取(用于觸發)的觸發器的最大數量,默認值是1。
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1根據以上兩個默認值,理論上在獲取Trigger 的時候不會上鎖,但是實際上為什么沒有出現頻繁的重復執行問題?因為每個調度器的線程持有鎖的時間太短了,單機的測試無法體現,而在高并發的情況下,有可能會出現這個問題。
QuartzSchedulerThread 的triggersFired()方法:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);調用了JobStoreSupport 的triggersFired()方法,接著又調用了一個triggerFiredtriggerFired(Connection conn, OperableTrigger trigger)方法:
如果Trigger 的狀態不是ACQUIRED,也就是說被其他的線程fire 了,返回空。但是這種樂觀鎖的檢查在高并發下難免會出現ABA 的問題,比如線程A 拿到的時候還是ACQUIRED 狀態,但是剛準備執行的時候已經變成了EXECUTING 狀態,這個時候就會出現重復執行的問題。
if (!state.equals(STATE_ACQUIRED)) {return null; }總結,如果:
如果設置的數量為1(默認值),并且使用JDBC JobStore(RAMJobStore 不支持分布式, 只有一個調度器實例, 所以不加鎖) , 則屬性org.quartz.jobStore.acquireTriggersWithinLock 應設置為true。否則不加鎖可能會導致任務重復執行。
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1 org.quartz.jobStore.acquireTriggersWithinLock=true?
總結
以上是生活随笔為你收集整理的任务调度之Quartz2的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 任务调度之Quartz1
- 下一篇: 任务调度之Elastic-Job1