日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

任务调度之Elastic-Job2

發布時間:2024/4/13 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 任务调度之Elastic-Job2 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

運維平臺

下載解壓運行

git 下載源碼https://github.com/elasticjob/elastic-job-lite

對elastic-job-lite-console 打包得到安裝包

解壓縮elastic-job-lite-console-${version}.tar.gz 并執行bin\start.sh(Windows運行.bat)。打開瀏覽器訪問http://localhost:8899/即可訪問控制臺。

8899 為默認端口號,可通過啟動腳本輸入-p 自定義端口號。

默認管理員用戶名和密碼是root/root。右上角可以切換語言。

添加ZK 注冊中心

第一步,添加注冊中心,輸入ZK 地址和命名空間,并連接。

運維平臺和elastic-job-lite 并無直接關系,是通過讀取作業注冊中心數據展現作業狀態,或更新注冊中心數據修改全局配置。

控制臺只能控制作業本身是否運行,但不能控制作業進程的啟動,因為控制臺和作業本身服務器是完全分離的,控制臺并不能控制作業服務器。

可以對作業進行操作。

事件追蹤

http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/

Elastic-Job 提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用于查詢、統計和監控。

Elastic-Job-Lite 在配置中提供了JobEventConfiguration,目前支持數據庫方式配置。

ejob-standalone:simple.SimpleJobTest

BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); ………… new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); package simple;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class SimpleJobTest {// TODO 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "ejob-standalone"));regCenter.init();// 數據源,使用DBCP /* BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);*/// 定義作業核心配置// TODO 如果修改了代碼,跑之前清空ZKJobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 作業分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業根配置// LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();// 構建Jobnew JobScheduler(regCenter, simpleJobRootConfig).init();// new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();}}

事件追蹤的event_trace_rdb_url 屬性對應庫自動創建JOB_EXECUTION_LOG 和JOB_STATUS_TRACE_LOG 兩張表以及若干索引。

需要在運維平臺中添加數據源信息,并且連接:

在作業歷史中查詢:

Spring 集成與分片詳解

ejob-springboot 工程

pom 依賴

<properties><elastic-job.version>2.1.5</elastic-job.version> </properties> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>${elastic-job.version}</version> </dependency> <!-- elastic-job-lite-spring --> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>${elastic-job.version}</version> </dependency>

application.properties

定義配置類和任務類中要用到的參數

server.port=${random.int[10000,19999]} regCenter.serverList = localhost:2181 regCenter.namespace = ejob-springboot leonJob.cron = 0/3 * * * * ? leonJob.shardingTotalCount = 2 leonJob.shardingItemParameters = 0=0,1=1

創建任務

創建任務類,加上@Component 注解

@Component public class SimpleJobDemo implements SimpleJob {public void execute(ShardingContext shardingContext) {System.out.println(String.format("------Thread ID: %s, %s,任務總片數: %s, " +"當前分片項: %s.當前參數: %s," +"當前任務名稱: %s.當前任務參數%s",Thread.currentThread().getId(),new SimpleDateFormat("HH:mm:ss").format(new Date()),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),shardingContext.getJobParameter()));} }

注冊中心配置

Bean 的initMethod 屬性用來指定Bean 初始化完成之后要執行的方法,用來替代繼承InitializingBean 接口,以便在容器啟動的時候創建注冊中心。

@Configuration public class ElasticRegCenterConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,@Value("${regCenter.namespace}") final String namespace) {return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));} }

作業三級配置

Core——Type——Lite

return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(

@Configuration public class ElasticJobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob,@Value("${leonJob.cron}") final String cron,@Value("${leonJob.shardingTotalCount}") final int shardingTotalCount,@Value("${leonJob.shardingItemParameters}") final StringshardingItemParameters) {return new SpringJobScheduler(simpleJob, regCenter,getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));}private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,final String shardingItemParameters) {return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();} }

作業運行

先把application.properties 中的分片數全部改成1

啟動com.leon.EjobApp 的main 方法

package com.leon;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class EjobApp {public static void main(String[] args) {SpringApplication.run(EjobApp.class, args);} }

分片策略

分片項與分片參數

任務分片,是為了實現把一個任務拆分成多個子任務,在不同的ejob 示例上執行。例如100W 條數據,在配置文件中指定分成10 個子任務(分片項),這10 個子任務再按照一定的規則分配到5 個實際運行的服務器上執行。除了直接用分片項ShardingItem獲取分片任務之外,還可以用item 對應的parameter 獲取任務。

standalone 工程:simple.SimpleJobTest

JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").build(); package simple;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class SimpleJobTest {// TODO 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "leon-ejob-standalone"));regCenter.init();// 數據源,使用DBCP /* BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);*/// 定義作業核心配置// TODO 如果修改了代碼,跑之前清空ZKJobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 作業分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業根配置// LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();// 構建Jobnew JobScheduler(regCenter, simpleJobRootConfig).init();// new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();}}

springboot 工程,在application.properties 中定義。

定義幾個分片項,一個任務就會有幾個線程去運行它。

注意:分片個數和分片參數要一一對應。通常把分片項設置得比E-Job 服務器個數大一些,比如3 臺服務器,分成9 片,這樣如果有服務器宕機,分片還可以相對均勻。4.7.2 分片驗證

為避免運行的任務太多看不清楚運行結果,可以注釋在ElasticJobConfig 中注釋DataFlowJob 和ScriptJob。SimpleJob 的分片項改成2。

直接運行com.leon.EjobApp。

或者打成jar 包:mvn package -Dmaven.test.skip=true

Jar 包路徑:ejob-springboot\target\ejob-springboot-0.0.1-SNAPSHOT.jar

修改名稱為ejob.jar 放到D 盤下。

多實例運行(單機):

java –jar ejob.jar

1、多運行一個點,任務不會重跑(兩個節點各獲得一個分片項)

2、關閉一個節點,任務不會漏跑

分片策略

http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/

分片項如何分配到服務器?這個跟分片策略有關。

策略類描述具體規則
AverageAllocationJobShardin
gStrategy
基于平均分配算法的分片策
略,也是默認的分片策略。
如果分片不能整除,則不能整除的多余分片將依
次追加到序號小的服務器。如:
? 如果有3 臺服務器,分成9 片,則每臺服務
器分到的分片是: 1=[0,1,2], 2=[3,4,5],
3=[6,7,8]
? 如果有3 臺服務器,分成8 片,則每臺服務
器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
? 如果有3 臺服務器,分成10 片,則每臺服務
器分到的分片是: 1=[0,1,2,9], 2=[3,4,5],
3=[6,7,8]
OdevitySortByNameJobShar
dingStrategy
根據作業名的哈希值奇偶數
決定IP 升降序算法的分片策
略。
根據作業名的哈希值奇偶數決定IP 升降序算法的
分片策略。
? 作業名的哈希值為奇數則IP 升序。
? 作業名的哈希值為偶數則IP 降序。
用于不同的作業平均分配負載至不同的服務器。
RotateServerByNameJobSha
rdingStrategy
根據作業名的哈希值對服務
器列表進行輪轉的分片策
略。
?
自定義分片策略?實現JobShardingStrategy 接口并實現sharding 方
法,接口方法參數為作業服務器IP 列表和分片策
略選項,分片策略選項包括作業名稱,分片總數
以及分片序列號和個性化參數對照表,可以根據
需求定制化自己的分片策略。

AverageAllocationJobShardingStrategy 的缺點是,一旦分片數小于作業服務器數,作業將永遠分配至IP 地址靠前的服務
器,導致IP 地址靠后的服務器空閑。而OdevitySortByNameJobShardingStrategy 則可以根據作業名稱重新分配服務器負
載。如:

如果有3 臺服務器,分成2 片,作業名稱的哈希值為奇數,則每臺服務器分到的分片是:1=[0], 2=[1], 3=[]

如果有3 臺服務器,分成2 片,作業名稱的哈希值為偶數,則每臺服務器分到的分片是:3=[0], 2=[1], 1=[]

在Lite 配置中指定分片策略:

String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();

分片方案

獲取到分片項shardingItem 之后,怎么對數據進行分片嗯?

1、對業務主鍵進行取模,獲取余數等于分片項的數據

舉例:獲取到的sharding item 是0,1

在SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。

這種方式的缺點:會導致索引失效,查詢數據時會全表掃描。

解決方案:在查詢條件中在增加一個索引條件進行過濾。

2、在表中增加一個字段,根據分片數生成一個mod 值。取模的基數要大于機器數。否則在增加機器后,會導致機器空閑。例如取模基數是2,而服務器有5 臺,那么有三臺服務器永遠空閑。而取模基數是10,生成10 個shardingItem,可以分配到5 臺服務器。當然,取模基數也可以調整。

3、如果從業務層面,可以用ShardingParamter 進行分片。

例如0=RDP, 1=CORE, 2=SIMS, 3=ECIF

List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID = 'RDP' limit 0, 100。

在Spring Boot 中要Elastic-Job 要配置的內容太多了,有沒有更簡單的添加任務的方法呢?比如在類上添加一個注解?這個時候我們就要用到starter 了。

e-job starter

Git 上有一個現成的實現

https://github.com/TFdream/elasticjob-spring-boot-starter

工程:elasticjob-spring-boot-starter

需求(一個starter 應該有什么樣子):

需求實現作用
可以在啟動類上使用@Enable 功
能開啟E-Job 任務調度
注解@EnableElasticJob在自動配置類上用@ConditionalOnBean
決定是否自動配置
可以在properties 或yml 中識別配
置內容
配置類RegCenterProperties.java支持在properties 文件中使用
elasticjob.regCenter 前綴,配置注冊中心
參數
在類上加上注解,直接創建任務注解@JobScheduled配置任務參數,包括定分片項、分片參
數等等
不用創建ZK注冊中心自動配置類
RegCentreAutoConfiguration.java
注入從RegCenterProperties.java 讀取到
的參數,自動創ZookeeperConfiguration
不用創建三級(Core、Type、Lite)
配置
自動配置類
JobAutoConfiguration.java
讀取注解的參數, 創建
JobCoreConfiguration 、
JobTypeConfiguration 、
LiteJobConfiguration
在注冊中心創建之后再創建
Spring Boot 啟動時自動配置創建
Resource/META-INF/spring.factori
es
指定兩個自動配置類
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ io.dreamstudio.elasticjob.autoconfigure.RegCentreAutoConfiguration,\ io.dreamstudio.elasticjob.autoconfigure.JobAutoConfiguration

打包starter 的工程,引入starter 的依賴,即可在項目中使用注解開啟任務調度功能。

E-Job 原理

啟動

standalone 工程

new JobScheduler(regCenter, simpleJobRootConfig).init();

init 方法

public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 設置分片數JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(),liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 構建任務,創建調度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(),createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),liteJobConfigFromRegCenter.getJobName());// 在ZK 上注冊任務JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 添加任務信息并進行節點選舉schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 啟動調度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }

registerStartUpInfo 方法

public void registerStartUpInfo(final boolean enabled) {// 啟動所有的監聽器listenerManager.startAllListeners();// 節點選舉leaderService.electLeader();// 服務信息持久化(寫到ZK)serverService.persistOnline(enabled);// 實例信息持久化(寫到ZK)instanceService.persistOnline();// 重新分片shardingService.setReshardingFlag();// 監控信息監聽器monitorService.listen();// 自診斷修復,使本地節點與ZK 數據一致if (!reconcileService.isRunning()) {reconcileService.startAsync();} }

監聽器用于監聽ZK 節點信息的變化。

啟動的時候進行主節點選舉

/** * 選舉主節點. */ public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed."); }

Latch 是一個分布式鎖,選舉成功后在instance 寫入服務器信息。

// 服務信息持久化(寫到ZK servers 節點)

serverService.persistOnline(enabled);

以下是單機運行多個實例:

// 實例信息持久化(寫到ZK instances 節點)

instanceService.persistOnline();

運行了兩個實例:

任務執行與分片原理

關注兩個問題:

1、LiteJob 是怎么被執行的?

2、分片項是怎么分配給不同的服務實例的?

在創建Job 的時候(createJobDetail),創建的是實現了Quartz 的Job 接口的LiteJob 類,LiteJob 類實現了Quartz 的Job 接口。

在LiteJob 的execute 方法中獲取對應類型的執行器,調用execute()方法。

public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {if (null == elasticJob) {return new ScriptJobExecutor(jobFacade);}if (elasticJob instanceof SimpleJob) {return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);}if (elasticJob instanceof DataflowJob) {return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);}throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName()); }

EJOB 提供管理任務執行器的抽象類AbstractElasticJobExecutor,核心動作在execute()方法中執行。

public final void execute() {

調用了另一個execute()方法,122 行:

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {

在這個execute 方法中又調用了process()方法,150 行

private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();// 只有一個分片項時,直接執行if (1 == items.size()) {int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName,executionSource, item);process(shardingContexts, item, jobExecutionEvent);return;}final CountDownLatch latch = new CountDownLatch(items.size());// 本節點遍歷執行相應的分片信息for (final int each : items) {final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName,executionSource, each);if (executorService.isShutdown()) {return;}executorService.submit(new Runnable() {@Overridepublic void run() {try {process(shardingContexts, each, jobExecutionEvent);} finally {latch.countDown();}}});}try {// 等待所有的分片項任務執行完畢latch.await();} catch (final InterruptedException ex) {Thread.currentThread().interrupt();} }

又調用了另一個process()方法,206 行

protected abstract void process(ShardingContext shardingContext);

交給具體的實現類(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去處理。

最終調用到任務類

@Override protected void process(final ShardingContext shardingContext) {simpleJob.execute(shardingContext); }

失效轉移

所謂失效轉移,就是在執行任務的過程中發生異常時,這個分片任務可以在其他節點再次執行。

simple.SimpleJobTest,failover 方法:

// 設置失效轉移 JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();

FailoverListenerManager 監聽的是zk 的instance 節點刪除事件。如果任務配置了failover 等于true,其中某個instance 與zk 失去聯系或被刪除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。

Job 的失效轉移監聽來源于FailoverListenerManager 中內部類JobCrashedJobListener 的dataChanged 方法。

當節點任務失效時會調用JobCrashedJobListener 監聽器,此監聽器會根據實例id獲取所有的分片,然后調用FailoverService 的setCrashedFailoverFlag 方法,將每個分片id 寫到/jobName/leader/failover/items 下,例如原來的實例負責1、2 分片項,
那么items 節點就會寫入1、2,代表這兩個分片項需要失效轉移。

protected void dataChanged(final String path, final Type eventType, final String data) {if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {return;}List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);if (!failoverItems.isEmpty()) {for (int each : failoverItems) {// 設置失效的分片項標記failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}} else {for (int each : shardingService.getShardingItems(jobInstanceId)) {failoverService.setCrashedFailoverFlag(each);failoverService.failoverIfNecessary();}}} }

然后接下來調用FailoverService 的failoverIfNessary 方法,首先判斷是否需要失敗轉移,如果可以需要則只需作業失敗轉移。

public void failoverIfNecessary() {if (needFailover()) {jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());} }

條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。

條件二:當前作業不在運行中。

private boolean needFailover() {return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT)&& !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()&& !JobRegistry.getInstance().isJobRunning(jobName); }

在主節點執行操作

public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {latch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ONhandleException(ex);} }

1、再次判斷是否需要失效轉移;

2、從注冊中心獲得一個`${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項;

3、在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 注冊作業分片項為當前作業節點;

4、然后移除任務轉移分片項;

5、最后調用執行,提交任務。

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {// 判斷是否需要失效轉移if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {return;}// 從${JOB_NAME}/leader/failover/items/${ITEM_ID}獲得一個分片項int crashedItem =Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);// 在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover`注冊作業分片項為當前作業節點jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem),JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());// 移除任務轉移分片項jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);if (null != jobScheduleController) {// 提交任務jobScheduleController.triggerJob();}} }

這里僅僅是觸發作業,而不是立即執行。

?

總結

以上是生活随笔為你收集整理的任务调度之Elastic-Job2的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。