Elastic-Job | 由浅入深一篇理解分布式定时任务的基本用法及简单原理解析
目錄
一、定時(shí)任務(wù)的基礎(chǔ)實(shí)現(xiàn)
1.?利用Thread及Sleep實(shí)現(xiàn),通過while循環(huán)讓其不停運(yùn)行
2.使用jdk的Timer和TimerTask
3.ScheduledExecutorService
4.?Quartz實(shí)現(xiàn)
附:Cron表達(dá)式
5.?Spring Task實(shí)現(xiàn)
6. 分布式定時(shí)任務(wù)Elastic-Job
1.概述
2.調(diào)度模型
3.功能
4.適用場(chǎng)景
5.分片策略
6. ElasticJob 原理
7.?失效轉(zhuǎn)移
其次,定時(shí)任務(wù)大體分兩種:指定間隔時(shí)間執(zhí)行 和 指定某個(gè)時(shí)間執(zhí)行
實(shí)現(xiàn)定時(shí)任務(wù)的途徑有很多,比如你甚至可以自己實(shí)現(xiàn)簡(jiǎn)單的定時(shí)任務(wù)
一、定時(shí)任務(wù)的基礎(chǔ)實(shí)現(xiàn)
1.?利用Thread及Sleep實(shí)現(xiàn),通過while循環(huán)讓其不停運(yùn)行
public class ThreadTaskDemo {public static void main(String[] args) {Runnable runable=new Runnable() {@Overridepublic void run() {System.out.println("子線程執(zhí)行任務(wù),當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};try {System.out.println("主線程啟動(dòng)子線程時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));scheduleThread(5L,3,runable);} catch (InterruptedException e) {e.printStackTrace();}}/*** @param duration 指定什么時(shí)間后運(yùn)行 單位:秒* @param timeInterval 每次運(yùn)行間隔時(shí)間 單位:秒* @param runnable 待運(yùn)行的Runable對(duì)象* @throws InterruptedException*/static void scheduleThread(Long duration,Integer timeInterval,Runnable runnable) throws InterruptedException{/*阻塞等待*/TimeUnit.SECONDS.sleep(duration);final Runnable interiorRun=runnable;final Integer interiorTimeInterval=timeInterval;/*運(yùn)行*/new Thread(new Runnable() {@Overridepublic void run() {while(true){/*執(zhí)行方法*/interiorRun.run();try {/*任務(wù)執(zhí)行間隔*/TimeUnit.SECONDS.sleep(interiorTimeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();} }2.使用jdk的Timer和TimerTask
????使用jdk的Timer和TimerTask,可以實(shí)現(xiàn)簡(jiǎn)單的間隔執(zhí)行任務(wù),無法實(shí)現(xiàn)按日歷去調(diào)度執(zhí)行任務(wù)
public class TimerTaskDemo {/*** 其中Timer和TimerTask的區(qū)別和聯(lián)系:* * Timer是調(diào)度者,可以安排任務(wù)執(zhí)行計(jì)劃。* * TimerTask是任務(wù)。Timer類可以調(diào)度TimerTask任務(wù),TimerTask則通過在run()方法里實(shí)現(xiàn)具體任務(wù)。TimerTask也可停止自身任務(wù)。* * 一個(gè)Timer可以調(diào)度多個(gè)TimerTask。* * Timer是單線程的:Timer構(gòu)造函數(shù)調(diào)用時(shí)會(huì)創(chuàng)建了一個(gè)新線程,所有TimerTask都是依靠這個(gè)新的線程執(zhí)行。* @param args*/public static void main(String[] args) {TimerTask timerTask = new TimerTask() {@Overridepublic void run() {System.out.println("當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Timer timer = new Timer();timer.schedule(timerTask,10,1000);}}3.ScheduledExecutorService
????ScheduledExecutorService是并發(fā)工具包中的類,是對(duì)比前面最理想的定時(shí)任務(wù)實(shí)現(xiàn)方式。
public class ScheduledDemo {public static void main(String[] args) {Runnable runnable1 = new Runnable() {@Overridepublic void run() {System.out.println("runnable1當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};Runnable runnable2 = new Runnable() {@Overridepublic void run() {System.out.println("runnable2當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}};//方式一 定義4個(gè)線程ScheduledExecutorService service = Executors.newScheduledThreadPool(4);ScheduledFuture<?> scheduledFuture= service.scheduleAtFixedRate(runnable1, 0,2, TimeUnit.SECONDS);//方式二ScheduledExecutorService service2 = Executors.newSingleThreadScheduledExecutor();service2.scheduleAtFixedRate(runnable2, 1, 2, TimeUnit.SECONDS);}}4.?Quartz實(shí)現(xiàn)
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version></dependency><dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz-jobs</artifactId><version>2.3.2</version></dependency> public class QuartzDemo {public static void main(String[] args) throws SchedulerException {//創(chuàng)建Scheduler的工廠SchedulerFactory sf = new StdSchedulerFactory();//從工廠中獲取調(diào)度器實(shí)例Scheduler scheduler = sf.getScheduler();/*** 創(chuàng)建JobDetail* withDescription:job的描述* withIdentity:job的name和group*/JobDetail jb = JobBuilder.newJob(QuartzSchedueJob.class).withDescription("this is a job").withIdentity("myJob", "myJobGroup").build();//任務(wù)運(yùn)行的時(shí)間,5秒后啟動(dòng)任務(wù)long time = System.currentTimeMillis() + 5 * 1000L;Date statTime = new Date(time);//創(chuàng)建Trigger,使用SimpleScheduleBuilder或者CronScheduleBuilderTrigger t = TriggerBuilder.newTrigger().withDescription("this is a trigger").withIdentity("myTrigger", "myTriggerGroup")//.withSchedule(SimpleScheduleBuilder.simpleSchedule())//設(shè)置啟動(dòng)時(shí)間.startAt(statTime)//每隔3秒執(zhí)行一次.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ? *")).build();//注冊(cè)任務(wù)和定時(shí)器scheduler.scheduleJob(jb, t);//啟動(dòng) 調(diào)度器scheduler.start();}}附:Cron表達(dá)式
????????在上面的demo中出現(xiàn)了"0/3 * * * * ? *",這是cron表達(dá)式,表示定時(shí)任務(wù)執(zhí)行的時(shí)間規(guī)則
????????cron?表達(dá)式是一個(gè)字符串,該字符串由?6?個(gè)空格分為?7?個(gè)域,每一個(gè)域代表一個(gè)時(shí)間含義。 格式: [秒] [分] [時(shí)] [日] [月] [周] [年],其中通常定義 “年” 的部分可以省略,實(shí)際常用的由前六部分組成。
????????關(guān)于?cron?的各個(gè)域的定義如下表格所示:
| 秒 | 是 | 0-59 | , - * / |
| 分 | 是 | 0-59 | , - * / |
| 時(shí) | 是 | 0-23 | , - * / |
| 日 | 是 | 1-31 | , - * ? / L W |
| 月 | 是 | 1-12 或 JAN-DEC | , - * / |
| 周 | 是 | 1-7 或 SUN-SAT | , - * ? / L # |
| 年 | 否 | 1970-2099 | , - * / |
這塊不作過多描述,有興趣的可以自行了解,也可以通過在線工具轉(zhuǎn)換:quartz/Cron/Crontab表達(dá)式在線生成工具-BeJSON.com
5.?Spring Task實(shí)現(xiàn)
1)SpringBoot:在Spring boot啟動(dòng)類上添加注解:@EnableScheduling
2)Spring:添加命名空間:xmlns:task="http://www.springframework.org/schema/task"
???????????????????添加約束:http://www.springframework.org/schema/task
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? http://www.springframework.org/schema/task/spring‐task.xsd
??????????????????開啟任務(wù)調(diào)度:<task:annotation‐driven></task:annotation‐driven>
?定時(shí)任務(wù)串行執(zhí)行:
@Component public class SpringTaskTest {private static final Logger LOGGER = LoggerFactory.getLogger(SpringTaskTest.class);/*** 每隔2秒執(zhí)行一次*/@Scheduled(cron = "0/2 * * * * *")public void task1() {LOGGER.info("--------------------task1開始--------------------");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}LOGGER.info("--------------------task1結(jié)束--------------------");} }定時(shí)任務(wù)并行執(zhí)行:
@Configuration //啟動(dòng)類或者此處配置@EnableScheduling public class TaskConfig implements SchedulingConfigurer, AsyncConfigurer {/*** 線程池線程數(shù)量*/private int poolSize = 5;@Beanpublic ThreadPoolTaskScheduler taskScheduler() {//創(chuàng)建定時(shí)任務(wù)線程池ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();///初始化線程池scheduler.initialize();//線程池容量scheduler.setPoolSize(poolSize);return scheduler;}@Overridepublic Executor getAsyncExecutor() {Executor executor = taskScheduler();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {scheduledTaskRegistrar.setTaskScheduler(taskScheduler());} }6. 分布式定時(shí)任務(wù)Elastic-Job
1.概述
???????上文中提到的多種定時(shí)任務(wù)的實(shí)現(xiàn),而本篇的重點(diǎn)在于站在“巨人”肩膀上的ElasticJob分布式調(diào)度框架,巨人是指“Quartz”和“Zookeeper”,Elastic-Job最開始只有一個(gè) elastic-job-core 的項(xiàng)目,在 2.X 版本以后主要分為Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個(gè)子項(xiàng)目。其中,Elastic-Job-Lite 定位為輕量級(jí)無中心化解決方案 , 使 用 jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。
????????應(yīng)用在各自的節(jié)點(diǎn)執(zhí)行任務(wù),通過 ZK 注冊(cè)中心協(xié)調(diào)。節(jié)點(diǎn)注冊(cè)、節(jié)點(diǎn)選舉、任務(wù)分片、監(jiān)聽都在 E-Job 的代碼中完成,以下是官網(wǎng)架構(gòu)圖:
2.調(diào)度模型
????????ElasticJob Lite是線程級(jí)別調(diào)度的進(jìn)程內(nèi)調(diào)度。
????????1.方便與Spring、Dubbo等Java框架配合使用,自由使用Spring注入Bean;
????????2.與業(yè)務(wù)應(yīng)用部署在一起,生命周期與業(yè)務(wù)應(yīng)用保持一致,是典型的嵌入式輕量級(jí)架構(gòu);
????????3.適用于資源使用穩(wěn)定、部署架構(gòu)簡(jiǎn)單的普通Java應(yīng)用;
????????4.分布式下的每個(gè)任務(wù)節(jié)點(diǎn)均是以自調(diào)度的方式適時(shí)的調(diào)度作業(yè),任務(wù)之間只需要一個(gè)注冊(cè)中心(注冊(cè)中心目前支持Zookeeper和ETCD兩種)對(duì)分布式場(chǎng)景下任務(wù)狀態(tài)進(jìn)行協(xié)調(diào)即可;
????????5.分布式作業(yè)節(jié)點(diǎn)通過選舉的方式獲取主節(jié)點(diǎn),主節(jié)點(diǎn)進(jìn)行分片,完畢后主節(jié)點(diǎn)和其他節(jié)點(diǎn)并無不同,都以自我調(diào)度的反射光hi執(zhí)行任務(wù)
????????ElasticJob Cloud調(diào)度方式是可以是進(jìn)程內(nèi)調(diào)度,作業(yè)類型屬于:常駐任務(wù),也可以是進(jìn)程級(jí)別調(diào)度,作業(yè)類型屬于:瞬時(shí)任務(wù)。
????????在ElasticJob Lite全部能力的基礎(chǔ)上,還擁有資源分配和任務(wù)分發(fā)的能力,將作業(yè)的開發(fā)、打包、分發(fā)、調(diào)度、治理、分片等一系列生命周期完全托管,是真正的作業(yè)云調(diào)度系統(tǒng)。
3.功能
? ? ? ? 1. 彈性調(diào)度:讓任務(wù)通過分片進(jìn)行水平擴(kuò)展的任務(wù)處理,每臺(tái)服務(wù)器只運(yùn)行分配給該服務(wù)器的分片;
? ? ? ? 2. 資源分配:由Mesos實(shí)現(xiàn),Mesos負(fù)責(zé)分配任務(wù)聲明所需要的資源(內(nèi)存和CPU),并將分配出去的資源進(jìn)行隔離
? ? ? ? 3. 作業(yè)治理:分布式場(chǎng)景下高可用、失效轉(zhuǎn)移、錯(cuò)過作業(yè)重新執(zhí)行等行為的治理協(xié)調(diào)
? ? ? ? 4. 可視化管理:包含作業(yè)增刪改查管控端、執(zhí)行歷史記錄查詢、配置中心管理等
4.適用場(chǎng)景
????????1.復(fù)雜任務(wù),如數(shù)據(jù)遷移,彈性分片能力大大減少海量數(shù)據(jù)遷移的時(shí)間
????????2.資源導(dǎo)向任務(wù),占用大量計(jì)算資源的報(bào)表作業(yè)適合采用瞬時(shí)作業(yè)實(shí)現(xiàn)
????????3.訂單拉取之類的,就是我們系統(tǒng)中最常用的那些場(chǎng)景
5.分片策略
1.?分片項(xiàng)與分片參數(shù)
任務(wù)分片,是為了實(shí)現(xiàn)把一個(gè)任務(wù)拆分成多個(gè)子任務(wù),在不同的 ejob 示例上執(zhí)行。例如 100W 條數(shù)據(jù),在配置文件中指定分成 10 個(gè)子任務(wù)(分片項(xiàng)),這 10 個(gè)子任務(wù)再按照一定的規(guī)則分配到 5 個(gè)實(shí)際運(yùn)行的服務(wù)器上執(zhí)行。除了直接用分片項(xiàng) ShardingItem獲取分片任務(wù)之外,還可以用 item 對(duì)應(yīng)的 parameter 獲取任務(wù)。
????????定義幾個(gè)分片項(xiàng),一個(gè)任務(wù)就會(huì)有幾個(gè)線程去運(yùn)行它。
注意:分片個(gè)數(shù)和分片參數(shù)要一一對(duì)應(yīng)。通常把分片項(xiàng)設(shè)置得比 E-Job 服務(wù)器個(gè)數(shù)大一些,比如 3 臺(tái)服務(wù)器,分成 9 片,這樣如果有服務(wù)器宕機(jī),分片還可以相對(duì)均勻。
2.?設(shè)置分片策略
// 作業(yè)分片策略 // 基于平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); // 定義Lite作業(yè)根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();? ? 3.分片方案
- 對(duì)業(yè)務(wù)主鍵進(jìn)行取模,獲取余數(shù)等于分片項(xiàng)的數(shù)據(jù),舉例:獲取到的 sharding item 是 0,1在 SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。這種方式的缺點(diǎn):會(huì)導(dǎo)致索引失效,查詢數(shù)據(jù)時(shí)會(huì)全表掃描。解決方案:在查詢條件中在增加一個(gè)索引條件進(jìn)行過濾。
- 在表中增加一個(gè)字段,根據(jù)分片數(shù)生成一個(gè) mod 值。取模的基數(shù)要大于機(jī)器數(shù)。否則在增加機(jī)器后,會(huì)導(dǎo)致機(jī)器空閑。例如取模基數(shù)是 2,而服務(wù)器有 5 臺(tái),那么有三臺(tái)服務(wù)器永遠(yuǎn)空閑。而取模基數(shù)是 10,生成 10 個(gè) shardingItem,可以分配到 5 臺(tái)服務(wù)器。當(dāng)然,取模基數(shù)也可以調(diào)整。
- 如果從業(yè)務(wù)層面,可以用 ShardingParamter 進(jìn)行分片。例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF,List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100。
在 Spring Boot 中要 Elastic-Job 要配置的內(nèi)容太多了,有沒有更簡(jiǎn)單的添加任務(wù)的方法呢?比如在類上添加一個(gè)注解?這個(gè)時(shí)候我們就要用到 starter 了。
6. ElasticJob 原理
? ? ? ?1. new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); 進(jìn)入啟動(dòng)流程
/** * 初始化作業(yè). */ public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 設(shè)置分片數(shù)JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 構(gòu)建任務(wù),創(chuàng)建調(diào)度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());// 在 ZK 上注冊(cè)任務(wù)JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 添加任務(wù)信息并進(jìn)行節(jié)點(diǎn)選舉schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 啟動(dòng)調(diào)度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }? ? ? ? 2.registerStartUpInfo 方法
/** * 注冊(cè)作業(yè)啟動(dòng)信息. * * @param enabled 作業(yè)是否啟用 */ public void registerStartUpInfo(final boolean enabled) {// 啟動(dòng)所有的監(jiān)聽器、監(jiān)聽器用于監(jiān)聽 ZK 節(jié)點(diǎn)信息的變化。listenerManager.startAllListeners();// 節(jié)點(diǎn)選舉leaderService.electLeader();// 服務(wù)信息持久化(寫到 ZK)serverService.persistOnline(enabled);// 實(shí)例信息持久化(寫到 ZK)instanceService.persistOnline();// 重新分片shardingService.setReshardingFlag();// 監(jiān)控信息監(jiān)聽器monitorService.listen();// 自診斷修復(fù),使本地節(jié)點(diǎn)與 ZK 數(shù)據(jù)一致if (!reconcileService.isRunning()) {reconcileService.startAsync();} }? ? ? ? ?3. 啟動(dòng)的時(shí)候進(jìn)行主節(jié)點(diǎn)選舉
/** * 選舉主節(jié)點(diǎn). */ public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed."); }????????Latch 是一個(gè)分布式鎖,選舉成功后在 instance 寫入服務(wù)器信息
? ? ? ? 4. 啟動(dòng)調(diào)度任務(wù)則是
/** * 調(diào)度作業(yè). * * @param cron CRON表達(dá)式 */ public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));} //調(diào)用 Quartz 一樣的類進(jìn)行啟動(dòng)scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);} }7.?失效轉(zhuǎn)移
失效轉(zhuǎn)移,就是在執(zhí)行任務(wù)的過程中發(fā)生異常時(shí),這個(gè)分片任務(wù)可以在其他節(jié)點(diǎn)再次執(zhí)行。
FailoverListenerManager 監(jiān)聽的是 zk 的 instance 節(jié)點(diǎn)刪除事件。如果任務(wù)配置了 failover 等于 true,其中某個(gè) instance 與 zk 失去聯(lián)系或被刪除,并且失效的節(jié)點(diǎn)又不是本身,就會(huì)觸發(fā)失效轉(zhuǎn)移邏輯。Job 的失效轉(zhuǎn)移監(jiān)聽來源于 FailoverListenerManager 中內(nèi)部類JobCrashedJobListener 的 dataChanged 方法。當(dāng)節(jié)點(diǎn)任務(wù)失效時(shí)會(huì)調(diào)用 JobCrashedJobListener 監(jiān)聽器,此監(jiān)聽器會(huì)根據(jù)實(shí)例 id獲取所有的分片,然后調(diào)用 FailoverService 的 setCrashedFailoverFlag 方法,將每個(gè)分片 id 寫到/jobName/leader/failover/items 下,例如原來的實(shí)例負(fù)責(zé) 1、2 分片項(xiàng),那么 items 節(jié)點(diǎn)就會(huì)寫入 1、2,代表這兩個(gè)分片項(xiàng)需要失效轉(zhuǎn)移。
class JobCrashedJobListener extends AbstractJobListener {@Overrideprotected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;} // 獲取到失效的分片集合List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) { // 設(shè)置失效的分片項(xiàng)標(biāo)記failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}}}}總結(jié)
以上是生活随笔為你收集整理的Elastic-Job | 由浅入深一篇理解分布式定时任务的基本用法及简单原理解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言数组初始化的问题
- 下一篇: 历史:古代:秦朝