E-Job
一,Quartz-Misfire
什么情況下錯過觸發?錯過觸發怎么辦?
線程池只有 5 個線程,當有 5 個任務都在執行的時候,第六個任務即將觸發,這個時候任務就不能得到執行。在 quartz.properties 有一個屬性 misfireThreshold,用來定義觸發器超時的"臨界值",也就是超過了這個時間,就算錯過觸發了。
例如,如果 misfireThreshold 是 60000(60 秒),9 點整應該執行的任務,9 點零1 分還沒有可用線程執行它,就會超時(misfires)。
下面這些原因可能造成 misfired job:
1、 沒有可用線程
2、 Trigger 被暫停
3、 系統重啟
4、 禁止并發執行的任務在到達觸發時間時,上次執行還沒有結束。
錯過觸發怎么辦?Misfire 策略設置
每一種 Trigger 都定義了自己的 Misfire 策略,不同的策略通過不同的方法來設置。
1、 忽略
2、 立即跑一次
3、 下次跑
怎么避免任務錯過觸發?
合理地設置線程池數量,以及任務觸發間隔。
二,E-Job概述
1.Quartz的不足
- 作業只能通過 DB 搶占隨機負載,無法協調
- 任務不能分片——單個任務數據太多了跑不完,消耗線程,負載不均
- 作業日志可視化監控、統計
2,發展歷史
Elastic-Job 是 ddframe 中的 dd-job 作業模塊分離出來的作業框架,基于 Quartz和 Curator 開發,在 2015 年開源。
輕量級,無中心化解決方案。
為什么說是去中心化呢?因為沒有統一的調度中心。集群的每個節點都是對等的,節點之間通過注冊中心進行分布式協調。E-Job 存在主節點的概念,但是主節點沒有調度的功能,而是用于處理一些集中式任務,如分片,清理運行時信息等。
如果 ZK 掛了怎么辦?
每個任務有獨立的線程池。
官網地址 github地址
Elastic-Job 最開始只有一個 elastic-job-core 的項目,在 2.X 版本以后主要分為Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子項目。其中,Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用 jar 包 的 形 式 提 供 分 布 式 任 務 的 協 調 服 務 。 而Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務(跟 Lite 的區別只是部署方式不同,他們使用相同的 API,只要開發一次)。
3,功能特性
- 分布式調度協調:用 ZK 實現注冊中心
- 錯過執行作業重觸發(Misfire)
- 支持并行調度(任務分片)
- 作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
- 彈性擴容縮容:將任務拆分為 n 個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job 將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。
- 失效轉移 failover:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
- 支持作業生命周期操作(Listener)
- 豐富的作業類型(Simple、DataFlow、Script)
- Spring 整合以及命名空間提供
- 運維平臺
4,項目架構
應用在各自的節點執行任務,通過 ZK 注冊中心協調。節點注冊、節點選舉、任務分片、監聽都在 E-Job 的代碼中完成。
三,基本操作
1,pom依賴
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version> </dependency>2,任務類型
1)SimpleJob
SimpleJob: 簡單實現,未經任何封裝的類型。需實現 SimpleJob 接口。
2)DataFlowJob
DataFlowJob:Dataflow 類型用于處理數據流,必須實現 fetchData()和processData()的方法,一個用來獲取數據,一個用來處理獲取到的數據。
3)ScriptJob
Script:Script 類型作業意為腳本類型作業,支持 shell,python,perl 等所有類型腳本。(只要指定腳本的內容或者位置)
3,E-Job配置
1)配置步驟
配置手冊
| Core | JobCoreConfiguration | 用于提供作業核心配置信息,如:作業名稱、CRON 表達式、分片總數等。 |
| Type | JobTypeConfiguration | 有 3 個子類分別對應 SIMPLE, DATAFLOW 和 SCRIPT 類型作業,提供 3 種作業需要的不同配置,如:DATAFLOW 類型是否流式處理或 SCRIPT 類型的命令行等。Simple 和 DataFlow 需要指定任務類的路徑。 |
| Root | JobRootConfiguration | 有 2 個子類分別對應 Lite 和 Cloud 部署類型,提供不同部署類型所需的配置,如:Lite 類型的是否需要覆蓋本地配置或 Cloud 占用 CPU 或 Memory數量等。可以定義分片策略。 |
作業配置分為 3 級,分別是 JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。 LiteJobConfiguration 使 用 JobTypeConfiguration ,JobTypeConfiguration 使用 JobCoreConfiguration,層層嵌套。
JobTypeConfiguration 根 據 不 同 實 現 類 型 分 為 SimpleJobConfiguration ,DataflowJobConfiguration 和 ScriptJobConfiguration。
E-Job 使用 ZK 來做分布式協調,所有的配置都會寫入到 ZK 節點。
2)ZK注冊中心數據結構
一個任務一個二級節點。
這里面有些節點是臨時節點,只有任務運行的時候才能看到。
[zk: localhost:2181(CONNECTED) 9] ls /elastic-job/com.yhd.ejob.tasktype.MyEJob [config, instances, leader, servers, sharding]注意:修改了任務重新運行任務不生效,是因為 ZK 的信息不會更新, 除非把overwrite 修改成 true。
①config節點
JSON格式存儲。
存儲任務的配置信息,包含執行類,cron 表達式,分片算法類,分片數量,分片參數等等。
{"jobName":"MySimpleJob","jobClass":"job.MySimpleJob","jobType":"SIMPLE","cron":"0/2 * * * * ?","shardingTotalCount":1,"shardingItemParameters":"","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":false }config節點的數據是通過ConfigService持久化到zookeeper中去的。默認狀態下,如果你修改了 Job 的配置比如 cron 表達式、分片數量等是不會更新到 zookeeper 上去的,除非你在 Lite 級別的配置把參數 overwrite 修改成 true。
②instances節點
同一個 Job 下的 elastic-job 的部署實例。一臺機器上可以啟動多個 Job 實例,也就是 Jar 包。instances 的命名是 IP+@-@+PID。只有在運行的時候能看到。
③leader節點
任務實例的主節點信息,通過 zookeeper 的主節點選舉,選出來的主節點信息。在 elastic job 中,任務的執行可以分布在不同的實例(節點)中,但任務分片等核心控制,需要由主節點完成。因此,任務執行前,需要選舉出主節點。
下面有三個子節點:
- election:主節點選舉
- sharding:分片
- failover:失效轉移
election 下面的 instance 節點顯示了當前主節點的實例 ID:jobInstanceId。
election 下面的 latch 節點也是一個永久節點用于選舉時候的實現分布式鎖。
sharding 節點下面有一個臨時節點,necessary,是否需要重新分片的標記。如果分片總數變化,或任務實例節點上下線或啟用/禁用,以及主節點選舉,都會觸發設置重分片標記,主節點會進行分片計算。
④servers節點
任務實例的信息,主要是 IP 地址,任務實例的 IP 地址。跟 instances 不同,如果多個任務實例在同一臺機器上運行則只會出現一個 IP 子節點。可在 IP 地址節點寫入 DISABLED 表示該任務實例禁用。
⑤sharding節點
任務的分片信息,子節點是分片項序號,從 0 開始。分片個數是在任務配置中設置的。分片項序號的子節點存儲詳細信息。每個分片項下的子節點用于控制和記錄分片運行狀態。最主要的子節點就是 instance。
| instance | 否 | 執行該分片項的作業運行實例主鍵 |
| running | 是 | 分片項正在運行的狀態,僅配置 monitorExecution 時有效 |
| failover | 是 | 如果該分片項被失效轉移分配給其他作業服務器,則此節點值記錄執行此分片的作業服務器 IP |
| misfire | 否 | 是否開啟錯過任務重新執行 |
| disabled | 否 | 是否禁用此分片項 |
四,運維平臺
1,下載解壓運行
github地址 對 elastic-job-lite-console 打包得到安裝包
解壓縮 elastic-job-lite-console-${version}.tar.gz 并執行 bin\start.sh(Windows運行.bat)。打開瀏覽器訪問 http://localhost:8899/即可訪問控制臺。
8899 為默認端口號,可通過啟動腳本輸入-p 自定義端口號。
默認管理員用戶名和密碼是 root/root。右上角可以切換語言。
2,添加 ZK 注冊中心
第一步,舔加注冊中心,輸入 ZK 地址和命名空間,并連接。
運維平臺和 elastic-job-lite 并無直接關系,是通過讀取作業注冊中心數據展現作業狀態,或更新注冊中心數據修改全局配置。
控制臺只能控制作業本身是否運行,但不能控制作業進程的啟動,因為控制臺和作業本身服務器是完全分離的,控制臺并不能控制作業服務器。
可以對作業進行操作。
3,事件追蹤
官網說明
Elastic-Job 提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用于查詢、統計和監控。
Elastic-Job-Lite 在配置中提供了 JobEventConfiguration,目前支持數據庫方式配置。
BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456"); JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); ………… new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();事件追蹤的 event_trace_rdb_url 屬性對應庫自動創建 JOB_EXECUTION_LOG 和JOB_STATUS_TRACE_LOG 兩張表以及若干索引。
需要在運維平臺中添加數據源信息,并且連接:
在作業歷史中查詢:
五,集成Spring與分片
1,依賴
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency>2,配置文件
regCenter.serverList=121.xxx.31.160:2181 regCenter.namespace=elastic-job3,創建任務
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description SimpleJob 簡單實現,未經過任何封裝。需要實現SimpleJob接口* @since 2021/3/29 10:20*/ @Component public class MyEJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId()));} } /*** @author yhd* @email yinhuidong1@xiaomi.com* @description 處理數據流,必須實現fetchData 和 processData 方法。* 一個用來獲取數據流,一個用來處理數據流。* @since 2021/3/29 10:23*/ @Component public class MyDataFlowJob implements DataflowJob<String> {/*** @author yhd* @since 2021/3/29 10:25* @email yinhuidong1@xiaomi.com* @description 獲取數據流* @params* @return*/@Overridepublic List<String> fetchData(ShardingContext shardingContext) {return Arrays.asList("zs","ls","ww");}/*** @author yhd* @since 2021/3/29 10:25* @email yinhuidong1@xiaomi.com* @description 處理數據流* @params* @return*/@Overridepublic void processData(ShardingContext shardingContext, List<String> list) {list.forEach(System.out::println);} }4,配置注冊中心
Bean 的 initMethod 屬性用來指定 Bean 初始化完成之后要執行的方法,用來替代繼承 InitializingBean 接口,以便在容器啟動的時候創建注冊中心。
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 定時任務注冊中心配置* @since 2021/3/29 11:03*/ @SpringBootConfiguration //不配置這個的話可能會出現空指針異常 @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class JobRegistryCenterConfig {/*** @author yhd* @since 2021/3/29 11:06* @email yinhuidong1@xiaomi.com* @description 配置定時任務注冊中心* @params* @return*/@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace){return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace));} }5,作業三級配置
Core——Type——Lite
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 作業三級配置* @since 2021/3/29 11:20*/ @SpringBootConfiguration public class MyJobConfig {private String cron = "0/5 * * * * ?";//分片數private Integer shardingTotalCount = 3;private String shardingItemParameters = "0=A,1=B,2=C";private String jobParameters = "parameter";@Resourceprivate ZookeeperRegistryCenter regCenter;@Resourceprivate MyEJob myEJob;@Resourceprivate MyDataFlowJob myDataFlowJob;/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description SimpleJob* @params* @since 2021/3/29 16:49*/@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(DataSource dataSource) {return new SpringJobScheduler(myEJob,regCenter,getLiteJobConfiguration(myEJob.getClass(),cron,shardingTotalCount,shardingItemParameters,jobParameters),new JobEventRdbConfiguration(dataSource));}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 啟動SimpleJob作業* @params* @since 2021/3/29 16:50*/private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,final String shardingItemParameters,final String jobParameters) {// 定義作業核心配置JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());// 定義Lite作業根配置return LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description DataFlowJob* @params* @since 2021/3/29 17:10*/@Bean(initMethod = "init")public JobScheduler dataFlowScheduler() {return new SpringJobScheduler(myDataFlowJob,regCenter,getDataFlowJobConfiguration(myDataFlowJob.getClass(),cron,shardingTotalCount,shardingItemParameters,jobParameters));}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description DataFlowJobConfiguration* @params* @since 2021/3/29 17:10*/private LiteJobConfiguration getDataFlowJobConfiguration(Class<? extends MyDataFlowJob> aClass, String cron, Integer shardingTotalCount, String shardingItemParameters, String jobParameters) {return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(aClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build(),aClass.getCanonicalName(),false)).overwrite(true).build();}}6,分片策略
1)分片項與分片參數
任務分片,是為了實現把一個任務拆分成多個子任務,在不同的 ejob 實例上執行。
例如 100W 條數據,在配置文件中指定分成 10 個子任務(分片項),這 10 個子任務再按照一定的規則分配到 5 個實際運行的服務器上執行。除了直接用分片項 ShardingItem獲取分片任務之外,還可以用 item 對應的 parameter 獲取任務。
定義幾個分片項,一個任務就會有幾個線程去運行它。
注意:分片個數和分片參數要一一對應。通常把分片項設置得比 E-Job 服務器個數大一些,比如 3 臺服務器,分成 9 片,這樣如果有服務器宕機,分片還可以相對均勻。
2)分片驗證
多實例運行(單機):
- 多運行一個點,任務不會重跑(兩個節點各獲得一個分片項)
- 關閉一個節點,任務不會漏跑
3)分片策略
官網說明
分片項如何分配到服務器?這個跟分片策略有關。
| AverageAllocationJobShardingStrategy | 基于平均分配算法的分片策略,也是默認的分片策略。 | 如果分片不能整除,則不能整除的多余分片將依 次追加到序號小的服務器。如: ? 如果有 3 臺服務器,分成 9 片,則每臺服務 器 分 到 的 分 片 是 : 1=[0,1,2], 2=[3,4,5], 3=[6,7,8] ? 如果有 3 臺服務器,分成 8 片,則每臺服務 器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5] ? 如果有 3 臺服務器,分成 10 片,則每臺服務 器 分 到 的 分 片 是 : 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8] |
| OdevitySortByNameJobShardingStrategy | 根據作業名的哈希值奇偶數決定 IP 升降序算法的分片策略。 | 根據作業名的哈希值奇偶數決定 IP 升降序算法的 分片策略。 ? 作業名的哈希值為奇數則 IP 升序。 ? 作業名的哈希值為偶數則 IP 降序。 用于不同的作業平均分配負載至不同的服務器。 |
| RotateServerByNameJobShardingStrategy | 根據作業名的哈希值對服務器列表進行輪轉的分片策略。 | |
| 自定義分片策略 | 實現 JobShardingStrategy 接口并實現 sharding 方 法,接口方法參數為作業服務器 IP 列表和分片策 略選項,分片策略選項包括作業名稱,分片總數 以及分片序列號和個性化參數對照表,可以根據 需求定制化自己的分片策略。 |
AverageAllocationJobShardingStrategy 的缺點是,一旦分片數小于作業服務器數,作業將永遠分配至 IP 地址靠前的服務器,導致 IP 地址靠后的服務器空閑。而 OdevitySortByNameJobShardingStrategy 則可以根據作業名稱重新分配服務器負載。如:
如果有 3 臺服務器,分成 2 片,作業名稱的哈希值為奇數,則每臺服務器分到的分片是:1=[0], 2=[1], 3=[]
如果有 3 臺服務器,分成 2 片,作業名稱的哈希值為偶數,則每臺服務器分到的分片是:3=[0], 2=[1], 1=[]
在 Lite 配置中指定分片策略:
String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();4)分片方案
獲取到分片項 shardingItem 之后,怎么對數據進行分片?
1、對業務主鍵進行取模,獲取余數等于分片項的數據
舉例:獲取到的 sharding item 是 0,1
在 SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。
這種方式的缺點:會導致索引失效,查詢數據時會全表掃描。
解決方案:在查詢條件中在增加一個索引條件進行過濾。
2、在表中增加一個字段,根據分片數生成一個 mod 值。取模的基數要大于機器數。否則在增加機器后,會導致機器空閑。例如取模基數是 2,而服務器有 5 臺,那么有三臺服務器永遠空閑。而取模基數是 10,生成 10 個 shardingItem,可以分配到 5 臺服務器。當然,取模基數也可以調整。
3、如果從業務層面,可以用 ShardingParamter 進行分片。
例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF
List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100。在 Spring Boot 中要 Elastic-Job 要配置的內容太多了,有沒有更簡單的添加任務的方法呢?比如在類上添加一個注解?這個時候我們就要用到 starter 了。
7,自定義ejob-spring-boot-starter
| 可以在啟動類上使用@Enable 功能開啟 E-Job 任務調度 | 注解@EnableElasticJob | 在自動配置類上用@ConditionalOnBean決定是否自動配置 |
| 可以在 properties 或 yml 中識別配置內容 | 配置類 RegCenterProperties.java | 支 持 在 properties 文 件 中 使 用elasticjob.regCenter 前綴,配置注冊中心參數 |
| 在類上加上注解,直接創建任務 | 注解 @JobScheduled | 配置任務參數,包括定分片項、分片參數等等 |
| 不用創建 ZK 注冊中心 | 自動配置類RegCentreAutoConfiguration.java | 注入從 RegCenterProperties.java 讀取到的參數,自動創 ZookeeperConfiguration |
| 不用創建三級(Core、Type、Lite)配置 | 自動配置類JobAutoConfiguration.java | 讀 取 注 解 的 參 數 , 創 建JobCoreConfiguration 、JobTypeConfiguration 、LiteJobConfiguration在注冊中心創建之后再創建 |
| Spring Boot 啟動時自動配置 | 創建Resource/META-INF/spring.factories | 指定兩個自動配置類 |
1)pom依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yhd</groupId><artifactId>ejob-spring-boot-starter</artifactId><version>1.0</version><name>ejob-spring-boot-starter</name><description>for the project elastic-job autoconfiguration</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- @ConfigurationProperties annotation processing (metadata for IDEs) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.0</version></dependency></dependencies></project>2)resources/META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.yhd.ejobspringbootstarter.autoconfigure.JobAutoConfiguration,\ com.yhd.ejobspringbootstarter.autoconfigure.RegCentreAutoConfiguration3)annotation
/*** @author yhd* @since 2021/3/30 16:47* @email yinhuidong1@xiaomi.com* @description 開啟 e-job 自動配置功能* @params* @return*/ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface EnableElasticJob {} /*** @author yhd* @since 2021/3/30 16:48* @email yinhuidong1@xiaomi.com* @description 表示這是一個任務類,可以通過注解的方式對任務類進行配置* @params* @return*/ @Component @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface JobScheduled {/*** 作業名稱* @return*/String jobName();/*** cron表達式,用于控制作業觸發時間* @return*/String cron() default "";/*** 作業分片總數* @return*/int shardingTotalCount() default 1;/*** 分片序列號和參數用等號分隔,多個鍵值對用逗號分隔* <p>分片序列號從0開始,不可大于或等于作業分片總數<p>* <p>如:<p>* <p>0=a,1=b,2=c<p>* @return*/String shardingItemParameters() default "";/*** 作業自定義參數* <p>作業自定義參數,可通過傳遞該參數為作業調度的業務方法傳參,用于實現帶參數的作業<p>* <p>例:每次獲取的數據量、作業實例從數據庫讀取的主鍵等<p>* @return*/String jobParameter() default "";/*** 是否開啟任務執行失效轉移,開啟表示如果作業在一次任務執行中途宕機,允許將該次未完成的任務在另一作業節點上補償執行* @return*/boolean failover() default false;/*** 是否開啟錯過任務重新執行* @return*/boolean misfire() default false;/*** 作業是否禁止啟動,可用于部署作業時,先禁止啟動,部署結束后統一啟動* @return*/boolean disabled() default false;boolean overwrite() default false;/*** 作業描述信息* @return*/String description() default "";boolean streamingProcess() default false;String scriptCommandLine() default "";boolean monitorExecution() default true;int monitorPort() default -1;int maxTimeDiffSeconds() default -1;String jobShardingStrategyClass() default "";}4)數據模型
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description TODO* @since 2021/3/30 20:22*/ @Data @NoArgsConstructor @AllArgsConstructor @Accessors(chain = true) public class Prop implements Serializable {String jobClass;String jobName;String cron;int shardingTotalCount;String shardingItemParameters;String jobParameter;String description;String jobShardingStrategyClass;String scriptCommandLine;boolean failover;boolean misfire;boolean overwrite;boolean disabled;boolean monitorExecution;boolean streamingProcess;int monitorPort;int maxTimeDiffSeconds;}5)配置文件映射類
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 配置文件可配置參數* @since 2021/3/30 16:51*/ @Data @ConfigurationProperties(prefix = "ejob.regcenter") public class RegCenterProperties {/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 連接Zookeeper服務器的列表. 包括IP地址和端口號. 多個地址用逗號分隔. 如: host1:2181,host2:2181* @params* @return* @since 2021/3/30 20:10*/private String serverLists;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 命名空間.* @params* @return* @since 2021/3/30 20:10*/private String namespace;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 等待重試的間隔時間的初始值. 單位毫秒.* @params* @return* @since 2021/3/30 20:11*/private int baseSleepTimeMilliseconds = 1000;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 等待重試的間隔時間的最大值. 單位毫秒.* @params* @return* @since 2021/3/30 20:11*/private int maxSleepTimeMilliseconds = 3000;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 最大重試次數* @params* @return* @since 2021/3/30 20:11*/private int maxRetries = 3;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 會話超時時間. 單位毫秒* @params* @return* @since 2021/3/30 20:11*/private int sessionTimeoutMilliseconds;/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 連接超時時間. 單位毫秒.* @params* @return* @since 2021/3/30 20:11*/private int connectionTimeoutMilliseconds;private String digest;}6)注冊中心自動配置類
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description Zookeeper自動配置類* @since 2021/3/30 17:00*/ @Configuration @ConditionalOnClass(ZookeeperRegistryCenter.class) @EnableConfigurationProperties(RegCenterProperties.class) public class RegCentreAutoConfiguration {@Resourceprivate RegCenterProperties regCenterProperties;@Bean(initMethod = "init")public ZookeeperRegistryCenter registryCenter() {ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(regCenterProperties.getServerLists(),regCenterProperties.getNamespace());zkConfig.setBaseSleepTimeMilliseconds(regCenterProperties.getBaseSleepTimeMilliseconds());zkConfig.setConnectionTimeoutMilliseconds(regCenterProperties.getConnectionTimeoutMilliseconds());zkConfig.setDigest(regCenterProperties.getDigest());zkConfig.setMaxRetries(regCenterProperties.getMaxRetries());zkConfig.setMaxSleepTimeMilliseconds(regCenterProperties.getMaxSleepTimeMilliseconds());zkConfig.setSessionTimeoutMilliseconds(regCenterProperties.getSessionTimeoutMilliseconds());return new ZookeeperRegistryCenter(zkConfig);}}7)定時任務自動配置類
/*** @author yhd* @email yinhuidong1@xiaomi.com* @description 定時任務自動配置類* @since 2021/3/30 16:56*/ @Slf4j @SpringBootConfiguration @ConditionalOnBean(annotation = EnableElasticJob.class) @ConditionalOnClass({SimpleJob.class, DataflowJob.class}) @AutoConfigureAfter(RegCentreAutoConfiguration.class) public class JobAutoConfiguration {private final String prefix = "ejob";@Resourceprivate ZookeeperRegistryCenter registryCenter;@Resourceprivate ApplicationContext ioc;private Environment environment;private final AtomicInteger counter = new AtomicInteger();@PostConstructpublic void init() {this.environment = ioc.getEnvironment();log.info("scan JobScheduled Annotation start");Map<String, Object> beanMap = ioc.getBeansWithAnnotation(JobScheduled.class);if (!CollectionUtils.isEmpty(beanMap)) {beanMap.forEach(this::initElasticJobBean);}}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 處理每一個定時任務類* @params* @since 2021/3/30 20:16*/private void initElasticJobBean(String beanName, Object bean) throws BeansException {Class<?> clazz = bean.getClass();JobScheduled jobScheduled = clazz.getAnnotation(JobScheduled.class);Prop prop = getProp(clazz, jobScheduled);JobCoreConfiguration coreConfig = getCoreConfig(prop);LiteJobConfiguration jobConfig = getJobConfig(prop, getTypeConfig(clazz, prop, coreConfig));BeanDefinitionBuilder builder = this.builder(clazz, bean, jobConfig);postProcessor(prop, builder);}private void postProcessor(Prop prop, BeanDefinitionBuilder builder) {DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) ioc.getAutowireCapableBeanFactory();String jobSchedulerBeanName = String.format("%s_Scheduler_%s", prop.getJobClass(), counter.incrementAndGet());while (beanFactory.containsBeanDefinition(jobSchedulerBeanName)) {jobSchedulerBeanName = String.format("%s_Scheduler_%s", prop.getJobClass(), counter.incrementAndGet());}if (log.isDebugEnabled()) {log.debug("Add JobScheduler bean:{} for job:{}", jobSchedulerBeanName, prop.getJobClass());}beanFactory.registerBeanDefinition(jobSchedulerBeanName, builder.getBeanDefinition());beanFactory.getBean(jobSchedulerBeanName);}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 構建SpringJobScheduler對象來初始化任務* @params* @since 2021/3/30 20:55*/private BeanDefinitionBuilder builder(Class<?> clazz, Object bean, LiteJobConfiguration jobConfig) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);builder.setInitMethodName("init");builder.setScope(BeanDefinition.SCOPE_PROTOTYPE);if (ScriptJob.class.isAssignableFrom(clazz)) {builder.addConstructorArgValue(null);} else {builder.addConstructorArgValue(bean);}builder.addConstructorArgValue(registryCenter);builder.addConstructorArgValue(jobConfig);builder.addConstructorArgValue(Collections.emptyList());return builder;}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 屬性封裝* @params* @since 2021/3/30 20:44*/private Prop getProp(Class<?> clazz, JobScheduled jobScheduled) {Prop prop = new Prop();prop.setJobClass(clazz.getName()).setJobName(jobScheduled.jobName()).setCron(getEnvironmentStringValue(prop.getJobName(), "cron", jobScheduled.cron())).setShardingTotalCount(getEnvironmentIntValue(prop.getJobName(), "shardingTotalCount", jobScheduled.shardingTotalCount())).setShardingItemParameters(getEnvironmentStringValue(prop.getJobName(), "shardingItemParameters", jobScheduled.shardingItemParameters())).setJobParameter(getEnvironmentStringValue(prop.getJobName(), "jobParameter", jobScheduled.jobParameter())).setDescription(getEnvironmentStringValue(prop.getJobName(), "description", jobScheduled.description())).setJobShardingStrategyClass(getEnvironmentStringValue(prop.getJobName(), "jobShardingStrategyClass", jobScheduled.jobShardingStrategyClass())).setScriptCommandLine(getEnvironmentStringValue(prop.getJobName(), "scriptCommandLine", jobScheduled.scriptCommandLine())).setFailover(getEnvironmentBooleanValue(prop.getJobName(), "failover", jobScheduled.failover())).setMisfire(getEnvironmentBooleanValue(prop.getJobName(), "misfire", jobScheduled.misfire())).setOverwrite(getEnvironmentBooleanValue(prop.getJobName(), "overwrite", jobScheduled.overwrite())).setDisabled(getEnvironmentBooleanValue(prop.getJobName(), "disabled", jobScheduled.disabled())).setMonitorExecution(getEnvironmentBooleanValue(prop.getJobName(), "monitorExecution", jobScheduled.monitorExecution())).setStreamingProcess(getEnvironmentBooleanValue(prop.getJobName(), "streamingProcess", jobScheduled.streamingProcess())).setMonitorPort(getEnvironmentIntValue(prop.getJobName(), "monitorPort", jobScheduled.monitorPort())).setMaxTimeDiffSeconds(getEnvironmentIntValue(prop.getJobName(), "maxTimeDiffSeconds", jobScheduled.maxTimeDiffSeconds()));return prop;}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 核心配置* @params* @since 2021/3/30 20:44*/private JobCoreConfiguration getCoreConfig(Prop prop) {return JobCoreConfiguration.newBuilder(prop.getJobName(), prop.getCron(), prop.getShardingTotalCount()).shardingItemParameters(prop.getShardingItemParameters()).description(prop.getDescription()).failover(prop.isFailover()).jobParameter(prop.getJobParameter()).misfire(prop.isMisfire()).build();}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 不同類型的任務配置處理* @params* @since 2021/3/30 20:47*/private JobTypeConfiguration getTypeConfig(Class<?> clazz, Prop prop, JobCoreConfiguration coreConfig) {JobTypeConfiguration typeConfig = null;if (SimpleJob.class.isAssignableFrom(clazz)) {typeConfig = new SimpleJobConfiguration(coreConfig, prop.getJobClass());} else if (DataflowJob.class.isAssignableFrom(clazz)) {typeConfig = new DataflowJobConfiguration(coreConfig, prop.getJobClass(), prop.isStreamingProcess());} else if (ScriptJob.class.isAssignableFrom(clazz)) {typeConfig = new ScriptJobConfiguration(coreConfig, prop.getScriptCommandLine());}return typeConfig;}/*** @return* @author yhd* @email yinhuidong1@xiaomi.com* @description TODO* @params* @since 2021/3/30 20:50*/private LiteJobConfiguration getJobConfig(Prop prop, JobTypeConfiguration typeConfig) {return LiteJobConfiguration.newBuilder(typeConfig).overwrite(prop.isOverwrite()).disabled(prop.isDisabled()).monitorPort(prop.getMonitorPort()).monitorExecution(prop.isMonitorExecution()).maxTimeDiffSeconds(prop.getMaxTimeDiffSeconds()).jobShardingStrategyClass(prop.getJobShardingStrategyClass()).build();}/*** @param fieldName 屬性名稱* @param defaultValue 默認值* @return* @author yhd* @email yinhuidong1@xiaomi.com* @description 獲取配置中的任務屬性值,environment沒有就用注解中的值* @params jobName 任務名稱* @since 2021/3/30 20:17*/private String getEnvironmentStringValue(String jobName, String fieldName, String defaultValue) {String key = prefix + jobName + "." + fieldName;String value = environment.getProperty(key);if (StringUtils.hasText(value)) {return value;}return defaultValue;}private int getEnvironmentIntValue(String jobName, String fieldName, int defaultValue) {String key = prefix + jobName + "." + fieldName;String value = environment.getProperty(key);if (StringUtils.hasText(value)) {return Integer.parseInt(value);}return defaultValue;}private long getEnvironmentLongValue(String jobName, String fieldName, long defaultValue) {String key = prefix + jobName + "." + fieldName;String value = environment.getProperty(key);if (StringUtils.hasText(value)) {return Long.parseLong(value);}return defaultValue;}private boolean getEnvironmentBooleanValue(String jobName, String fieldName, boolean defaultValue) {String key = prefix + jobName + "." + fieldName;String value = environment.getProperty(key);if (StringUtils.hasText(value)) {return Boolean.parseBoolean(value);}return defaultValue;} }六,E-Job原理
1,啟動
public static void main(String[] args) {ZookeeperRegistryCenter center = new ZookeeperRegistryCenter(new ZookeeperConfiguration("121.199.31.160:2181", "e-job"));center.init();JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("myjob", "0/2 * * * * ?", 1).build();SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(simpleCoreConfig, MyJob.class.getCanonicalName());LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();new JobScheduler(center,liteJobConfiguration).init();}init()
/*** 初始化作業.*/public void init() {//更新作業配置,并返回更新后的作業配置LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);//設置當前分片總數JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());//構建任務,創建調度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());//在 ZK 上注冊任務JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);//添加任務信息并進行節點選取schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());//啟動調度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}registerStartUpInfo()-添加任務信息并進行節點選取
/*** 注冊作業啟動信息.* * @param enabled 作業是否啟用*/public void registerStartUpInfo(final boolean enabled) {//開啟所有監聽器listenerManager.startAllListeners();//選舉主節點leaderService.electLeader();//服務信息持久化到zkserverService.persistOnline(enabled);//實例信息持久化到zkinstanceService.persistOnline();//設置需要重新分片的標記shardingService.setReshardingFlag();//初始化作業監聽服務monitorService.listen();//自診斷修復,使本地節點與 ZK 數據一致if (!reconcileService.isRunning()) {reconcileService.startAsync();}}監聽器用于監聽Zk節點的變化。
electLeader()-啟動的時候進行節點選擇
/*** 選舉主節點.*/public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed.");}Latch 是一個分布式鎖,選舉成功后在 instance 寫入服務器信息。
查看LeaderElectionExecutionCallback類
@RequiredArgsConstructorclass LeaderElectionExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {if (!hasLeader()) {//填充臨時節點數據jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());}}}executeInLeade()-選主邏輯
/*** 在主節點執行操作.* * @param latchNode 分布式鎖使用的作業節點名稱* @param callback 執行操作的回調*/public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {//step intolatch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ONhandleException(ex);}}調用了 curator 的邏輯進行選主。
latch.start()
/*** Add this instance to the leadership election and attempt to acquire leadership.** @throws Exception errors*/ public void start() throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{internalStart();}finally{startTask.set(null);}}})); }persistOnline()-服務信息持久化
/*** 持久化作業服務器上線信息.* * @param enabled 作業是否啟用*/public void persistOnline(final boolean enabled) {if (!JobRegistry.getInstance().isShutdown(jobName)) {jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());}}填充的是servers數據
persistOnline()-實例信息持久化
/*** 持久化作業運行實例上線相關信息.*/public void persistOnline() {jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");}填充的是instance節點
2,任務執行與分片原理
- LiteJob 是怎么被執行的?
- 分片項是怎么分配給不同的服務實例的?
在創建 Job 的時候(createJobDetail),創建的是實現了 Quartz 的 Job 接口的LiteJob 類,LiteJob 類實現了 Quartz 的 Job 接口。
private JobDetail createJobDetail(final String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();if (elasticJobInstance.isPresent()) {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {try {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());} catch (final ReflectiveOperationException ex) {throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);}}return result;}在 LiteJob 的 execute 方法中獲取對應類型的執行器,調用 execute()方法。
/*** Lite調度作業.** @author zhangliang*/ public final class LiteJob implements Job {@Setterprivate ElasticJob elasticJob;@Setterprivate JobFacade jobFacade;@Overridepublic void execute(final JobExecutionContext context) throws JobExecutionException {JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();} }EJOB 提供管理任務執行器的抽象類 AbstractElasticJobExecutor,核心動作在execute()方法中執行。
/*** 獲取作業執行器.** @param elasticJob 分布式彈性作業* @param jobFacade 作業內部服務門面服務* @return 作業執行器*/@SuppressWarnings("unchecked")public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {if (null == elasticJob) {return new ScriptJobExecutor(jobFacade);}if (elasticJob instanceof SimpleJob) {return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);}if (elasticJob instanceof DataflowJob) {return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);}throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());}在AbstractElasticJobExecutor的execute()中調用了execute()
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);點進去,又調用了process()
process(shardingContexts, executionSource); private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();//只有一個分片時,直接執行if (1 == items.size()) {int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);process(shardingContexts, item, jobExecutionEvent);return;}final CountDownLatch latch = new CountDownLatch(items.size());//當前節點遍歷執行相應的分片信息for (final int each : items) {final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);if (executorService.isShutdown()) {return;}executorService.submit(new Runnable() {@Overridepublic void run() {try {//交給具體的實現類(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去處理。process(shardingContexts, each, jobExecutionEvent);} finally {latch.countDown();}}});}try {//等待所有分片項執行完畢latch.await();} catch (final InterruptedException ex) {Thread.currentThread().interrupt();}}又調用了另一個 process()方法,交給具體的實現類(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去處理。最終調用到任務類。
@Overrideprotected void process(final ShardingContext shardingContext) {simpleJob.execute(shardingContext);}3,失效轉移
所謂失效轉移,就是在執行任務的過程中發生異常時,這個分片任務可以在其他節點再次執行。
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("myjob", "0/2 * * * * ?", 1).failover(true).build();FailoverListenerManager 監聽的是 zk 的 instance 節點刪除事件。如果任務配置了 failover 等于 true,其中某個 instance 與 zk 失去聯系或被刪除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。
Job 的失效轉移監聽來源于 FailoverListenerManager 中內部類 JobCrashedJobListener 的 dataChanged 方法。
當節點任務失效時會調用 JobCrashedJobListener 監聽器,此監聽器會根據實例 id獲取所有的分片,然后調用 FailoverService 的 setCrashedFailoverFlag 方法,將每個分片 id 寫到/jobName/leader/failover/items 下,例如原來的實例負責 1、2 分片項,那么 items 節點就會寫入 1、2,代表這兩個分片項需要失效轉移。
protected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;}List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) {//設置失效的分片項標記failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}} }然后接下來調用 FailoverService 的 failoverIfNessary 方法,首先判斷是否需要失敗轉移,如果可以需要則只需作業失敗轉移。
/*** 如果需要失效轉移, 則執行作業失效轉移.*/public void failoverIfNecessary() {if (needFailover()) {jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());}}條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。
條件二:當前作業不在運行中。
private boolean needFailover() {return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()&& !JobRegistry.getInstance().isJobRunning(jobName); }在主節點執行操作
/*** 在主節點執行操作.* * @param latchNode 分布式鎖使用的作業節點名稱* @param callback 執行操作的回調*/public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {latch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ONhandleException(ex);}} class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {//判斷是否需要失效轉移if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {return;}//從${JOB_NAME}/leader/failover/items/${ITEM_ID}獲取到一個分片項int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));// 在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover`注冊作業分片項為當前作業節點jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());//移除任務轉移分片項jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));// TODO 不應使用triggerJob, 而是使用executor統一調度JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);if (null != jobScheduleController) {//提交任務jobScheduleController.triggerJob();}} } /*** 立刻啟動作業.*/public synchronized void triggerJob() {try {if (!scheduler.isShutdown()) {scheduler.triggerJob(jobDetail.getKey());}} catch (final SchedulerException ex) {throw new JobSystemException(ex);}}作業和消息中間件的區別是:作業是時間驅動的產品,消息中間件是事件驅動的產品。
總結
- 上一篇: c++三维静态数组的定义与作为函数的传递
- 下一篇: 栈出现的异常和设置栈的大小-Xss