任务调度之Elastic-Job1
目標
1、了解Elastic-Job 的基本特性
2、掌握Elastic-Job 開發(fā)與配置方式(包括Java 開發(fā)和Spring Boot 開發(fā)),掌握任務(wù)類型和任務(wù)分片策略
3、了解Elastic-Job 運維平臺的使用
4、掌握Elastic-Job 運行原理
內(nèi)容定位
適合了解了Quartz 的調(diào)度模型之后,想要知道如何基于ZK 配置Quartz 和如何實現(xiàn)任務(wù)分片的同學(xué)
Quartz-Misfire
什么情況下錯過觸發(fā)?錯過觸發(fā)怎么辦?
線程池只有5 個線程,當有5 個任務(wù)都在執(zhí)行的時候,第六個任務(wù)即將觸發(fā),這個時候任務(wù)就不能得到執(zhí)行。在quartz.properties 有一個屬性misfireThreshold,用來定義觸發(fā)器超時的"臨界值",也就是超過了這個時間,就算錯過觸發(fā)了。
例如,如果misfireThreshold 是60000(60 秒),9 點整應(yīng)該執(zhí)行的任務(wù),9 點零1 分還沒有可用線程執(zhí)行它,就會超時(misfires)。
下面這些原因可能造成misfired job:
1、沒有可用線程
2、Trigger 被暫停
3、系統(tǒng)重啟
4、禁止并發(fā)執(zhí)行的任務(wù)在到達觸發(fā)時間時,上次執(zhí)行還沒有結(jié)束。
錯過觸發(fā)怎么辦?Misfire 策略設(shè)置
每一種Trigger 都定義了自己的Misfire 策略,不同的策略通過不同的方法來設(shè)置。
standalone 工程MisfireTest
大體上來說有3 種:
1、忽略
2、立即跑一次
3、下次跑
詳細內(nèi)容參考:
https://gper.club/articles/7e7e7f7ff7g59gc5g69
怎么避免任務(wù)錯過觸發(fā)?
合理地設(shè)置線程池數(shù)量,以及任務(wù)觸發(fā)間隔。
認識E-Job
任務(wù)調(diào)度高級需求
Quartz 的不足:
1、作業(yè)只能通過DB 搶占隨機負載,無法協(xié)調(diào)
2、任務(wù)不能分片——單個任務(wù)數(shù)據(jù)太多了跑不完,消耗線程,負載不均
3、作業(yè)日志可視化監(jiān)控、統(tǒng)計
發(fā)展歷史
E-Job 是怎么來的?
在當當?shù)膁dframe 框架中,需要一個任務(wù)調(diào)度系統(tǒng)(作業(yè)系統(tǒng))
實現(xiàn)的話有兩種思路,一個是修改開源產(chǎn)品,一種是基于開源產(chǎn)品搭建(封裝),當當選擇了后者,最開始這個調(diào)度系統(tǒng)叫做dd-job。它是一個無中心化的分布式調(diào)度框架。因為數(shù)據(jù)庫缺少分布式協(xié)調(diào)功能(比如選主),替換為Zookeeper 后,增加了彈性
擴容和數(shù)據(jù)分片的功能。
Elastic-Job 是ddframe 中的dd-job 作業(yè)模塊分離出來的作業(yè)框架,基于Quartz和Curator 開發(fā),在2015 年開源。
輕量級,無中心化解決方案。
為什么說是去中心化呢?因為沒有統(tǒng)一的調(diào)度中心。集群的每個節(jié)點都是對等的,節(jié)點之間通過注冊中心進行分布式協(xié)調(diào)。E-Job 存在主節(jié)點的概念,但是主節(jié)點沒有調(diào)度的功能,而是用于處理一些集中式任務(wù),如分片,清理運行時信息等。
思考:如果ZK 掛了怎么辦?
每個任務(wù)有獨立的線程池。
從官網(wǎng)開始
http://elasticjob.io/docs/elastic-job-lite/00-overview/
https://github.com/elasticjob
Elastic-Job 最開始只有一個elastic-job-core 的項目,在2.X 版本以后主要分為Elastic-Job-Lite 和Elastic-Job-Cloud 兩個子項目。其中,Elastic-Job-Lite 定位為輕量級無中心化解決方案, 使用jar 包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。而Elastic-Job-Cloud 使用Mesos + Docker 的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進程隔離等服務(wù)(跟Lite 的區(qū)別只是部署方式不同,他們使用相同的API,只要開發(fā)一次)。
功能特性
分布式調(diào)度協(xié)調(diào):用ZK 實現(xiàn)注冊中心
錯過執(zhí)行作業(yè)重觸發(fā)(Misfire)
支持并行調(diào)度(任務(wù)分片)
作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個執(zhí)行實例
彈性擴容縮容:將任務(wù)拆分為n 個任務(wù)項后,各個服務(wù)器分別執(zhí)行各自分配到的任務(wù)項。一旦有新的服務(wù)器加入集群,或現(xiàn)有服務(wù)器下線,elastic-job 將在保留本次任務(wù)執(zhí)行不變的情況下,下次任務(wù)開始前觸發(fā)任務(wù)重分片。
失效轉(zhuǎn)移failover:彈性擴容縮容在下次作業(yè)運行前重分片,但本次作業(yè)執(zhí)行的過程中,下線的服務(wù)器所分配的作業(yè)將不會重新被分配。失效轉(zhuǎn)移功能可以在本次作業(yè)運行中用空閑服務(wù)器抓取孤兒作業(yè)分片執(zhí)行。同樣失效轉(zhuǎn)移功能也會犧牲部分性能。
支持作業(yè)生命周期操作(Listener)
豐富的作業(yè)類型(Simple、DataFlow、Script)
Spring 整合以及命名空間提供
運維平臺
項目架構(gòu)
應(yīng)用在各自的節(jié)點執(zhí)行任務(wù),通過ZK 注冊中心協(xié)調(diào)。節(jié)點注冊、節(jié)點選舉、任務(wù)分片、監(jiān)聽都在E-Job 的代碼中完成。
Java 開發(fā)
工程:ejob-standalone
pom 依賴
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version> </dependency>任務(wù)類型
standalone 工程
任務(wù)類型有三種:
SimpleJob
SimpleJob: 簡單實現(xiàn),未經(jīng)任何封裝的類型。需實現(xiàn)SimpleJob 接口。
ejob-standalone MySimpleJob.java
public class MyElasticJob implements SimpleJob {public void execute(ShardingContext context) {System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId()));} }DataFlowJob
DataFlowJob:Dataflow 類型用于處理數(shù)據(jù)流,必須實現(xiàn)fetchData()和processData()的方法,一個用來獲取數(shù)據(jù),一個用來處理獲取到的數(shù)據(jù)。
ejob-standalone MyDataFlowJob.java
public class MyDataFlowJob implements DataflowJob<String> {@Overridepublic List<String> fetchData(ShardingContext shardingContext) {// 獲取到了數(shù)據(jù)return Arrays.asList("leon","jack","seven");}@Overridepublic void processData(ShardingContext shardingContext, List<String> data) {data.forEach(x-> System.out.println("開始處理數(shù)據(jù):"+x));} }ScriptJob
Script:Script 類型作業(yè)意為腳本類型作業(yè),支持shell,python,perl 等所有類型腳本。D 盤下新建1.bat,內(nèi)容:
@echo ------【腳本任務(wù)】Sharding Context: %*ejob-standalone script.ScriptJobTest
package script;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import dataflow.MyDataFlowJob;public class ScriptJobTest {// 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "ejob-standalone"));regCenter.init();// 定義作業(yè)核心配置JobCoreConfiguration scriptJobCoreConfig = JobCoreConfiguration.newBuilder("MyScriptJob", "0/4 * * * * ?", 2).build();// 定義SCRIPT類型配置ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptJobCoreConfig,"D:/1.bat");// 作業(yè)分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業(yè)根配置// LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).build();// 構(gòu)建Jobnew JobScheduler(regCenter, scriptJobRootConfig).init();// new JobScheduler(regCenter, scriptJobRootConfig, jobEventConfig).init();}}只要指定腳本的內(nèi)容或者位置
E-Job 配置
配置步驟
配置手冊:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
1、ZK 注冊中心配置(后面繼續(xù)分析)
2、作業(yè)配置(從底層往上層:Core——Type——Lite)
| Core | JobCoreConfiguration | 用于提供作業(yè)核心配置信息,如:作業(yè)名稱、CRON 表達式、分片總數(shù)等。 |
| Type | JobTypeConfiguration | 有3 個子類分別對應(yīng)SIMPLE, DATAFLOW 和SCRIPT 類型作業(yè),提供3 種作 業(yè)需要的不同配置,如:DATAFLOW 類型是否流式處理或SCRIPT 類型的命 令行等。Simple 和DataFlow 需要指定任務(wù)類的路徑。 |
| Root | JobRootConfiguration | 有2 個子類分別對應(yīng)Lite 和Cloud 部署類型,提供不同部署類型所需的配 置,如:Lite 類型的是否需要覆蓋本地配置或Cloud 占用CPU 或Memory 數(shù)量等。 可以定義分片策略。 http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/ |
作業(yè)配置分為3 級,分別是JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。LiteJobConfiguration 使用JobTypeConfiguration ,JobTypeConfiguration 使用JobCoreConfiguration,層層嵌套。
JobTypeConfiguration 根據(jù)不同實現(xiàn)類型分為SimpleJobConfiguration ,DataflowJobConfiguration 和ScriptJobConfiguration。
E-Job 使用ZK 來做分布式協(xié)調(diào),所有的配置都會寫入到ZK 節(jié)點。
ZK 注冊中心數(shù)據(jù)結(jié)構(gòu)
一個任務(wù)一個二級節(jié)點。
這里面有些節(jié)點是臨時節(jié)點,只有任務(wù)運行的時候才能看到。
注意:修改了任務(wù)重新運行任務(wù)不生效,是因為ZK 的信息不會更新, 除非把overwrite 修改成true。
config 節(jié)點
JSON 格式存儲。
存儲任務(wù)的配置信息,包含執(zhí)行類,cron 表達式,分片算法類,分片數(shù)量,分片參數(shù)等等。
{"jobName": "MySimpleJob","jobClass": "job.MySimpleJob","jobType": "SIMPLE","cron": "0/2 * * * * ?","shardingTotalCount": 1,"shardingItemParameters": "","jobParameter": "","failover": false,"misfire": true,"description": "","jobProperties": {"job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution": true,"maxTimeDiffSeconds": -1,"monitorPort": -1,"jobShardingStrategyClass": "","reconcileIntervalMinutes": 10,"disabled": false,"overwrite": false }config 節(jié)點的數(shù)據(jù)是通過ConfigService 持久化到zookeeper 中去的。默認狀態(tài)下,如果你修改了Job 的配置比如cron 表達式、分片數(shù)量等是不會更新到zookeeper 上去的,除非你在Lite 級別的配置把參數(shù)overwrite 修改成true。
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();instances 節(jié)點
同一個Job 下的elastic-job 的部署實例。一臺機器上可以啟動多個Job 實例,也就是Jar 包。instances 的命名是IP+@-@+PID。只有在運行的時候能看到。
leader 節(jié)點
任務(wù)實例的主節(jié)點信息,通過zookeeper 的主節(jié)點選舉,選出來的主節(jié)點信息。在elastic job 中,任務(wù)的執(zhí)行可以分布在不同的實例(節(jié)點)中,但任務(wù)分片等核心控制,需要由主節(jié)點完成。因此,任務(wù)執(zhí)行前,需要選舉出主節(jié)點。
下面有三個子節(jié)點:
election:主節(jié)點選舉
sharding:分片
failover:失效轉(zhuǎn)移
election 下面的instance 節(jié)點顯示了當前主節(jié)點的實例ID:jobInstanceId。
election 下面的latch 節(jié)點也是一個永久節(jié)點用于選舉時候的實現(xiàn)分布式鎖。
sharding 節(jié)點下面有一個臨時節(jié)點,necessary,是否需要重新分片的標記。如果分片總數(shù)變化,或任務(wù)實例節(jié)點上下線或啟用/禁用,以及主節(jié)點選舉,都會觸發(fā)設(shè)置重分片標記,主節(jié)點會進行分片計算。
servers 節(jié)點
任務(wù)實例的信息,主要是IP 地址,任務(wù)實例的IP 地址。跟instances 不同,如果多個任務(wù)實例在同一臺機器上運行則只會出現(xiàn)一個IP 子節(jié)點。可在IP 地址節(jié)點寫入DISABLED 表示該任務(wù)實例禁用。
sharding 節(jié)點
任務(wù)的分片信息,子節(jié)點是分片項序號,從0 開始。分片個數(shù)是在任務(wù)配置中設(shè)置的。分片項序號的子節(jié)點存儲詳細信息。每個分片項下的子節(jié)點用于控制和記錄分片運行狀態(tài)。最主要的子節(jié)點就是instance。
| instance | 否 | 執(zhí)行該分片項的作業(yè)運行實例主鍵 |
| running | 是 | 分片項正在運行的狀態(tài) 僅配置monitorExecution 時有效 |
| failover | 是 | 如果該分片項被失效轉(zhuǎn)移分配給其他作業(yè)服務(wù)器,則此節(jié)點值記錄執(zhí)行此分 片的作業(yè)服務(wù)器IP |
| misfire | 否 | 是否開啟錯過任務(wù)重新執(zhí)行 |
| disabled | 否 | 是否禁用此分片項 |
?
總結(jié)
以上是生活随笔為你收集整理的任务调度之Elastic-Job1的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 任务调度之Quartz2
- 下一篇: 任务调度之Elastic-Job2