日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

elasticjob disable JOB

發布時間:2023/12/10 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticjob disable JOB 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

先上總結:

  • 可以disable job或者server,或者某個server上的job實例
  • 還是會定時調度,但調度過程發現為disable后不運行job
  • disable 入口代碼:

    private void disableOrEnableJobs(final Optional<String> jobName, final Optional<String> serverIp, final boolean disabled) {Preconditions.checkArgument(jobName.isPresent() || serverIp.isPresent(), "At least indicate jobName or serverIp.");if (jobName.isPresent() && serverIp.isPresent()) {persistDisabledOrEnabledJob(jobName.get(), serverIp.get(), disabled);} else if (jobName.isPresent()) {JobNodePath jobNodePath = new JobNodePath(jobName.get());for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath())) {if (disabled) {regCenter.persist(jobNodePath.getServerNodePath(each), "DISABLED");} else {regCenter.persist(jobNodePath.getServerNodePath(each), "");}}} else if (serverIp.isPresent()) {List<String> jobNames = regCenter.getChildrenKeys("/");for (String each : jobNames) {if (regCenter.isExisted(new JobNodePath(each).getServerNodePath(serverIp.get()))) {persistDisabledOrEnabledJob(each, serverIp.get(), disabled);}}}}

    disable的nodepath 為 /test-disablejobName/servers/192.168.157.1,即按server 進行disable

    JOB調度執行函數為類AbstractElasticJobExecutor的如下代碼:

    private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {if (shardingContexts.getShardingItemParameters().isEmpty()) {if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));}return;}

    上面代碼的getShardingItemParameters 會返回empty,shardingContexts的獲取來自以下代碼:

    @Overridepublic ShardingContexts getShardingContexts() {boolean isFailover = configService.load(true).isFailover();if (isFailover) {List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();if (!failoverShardingItems.isEmpty()) {return executionContextService.getJobShardingContext(failoverShardingItems);}}shardingService.shardingIfNecessary();List<Integer> shardingItems = shardingService.getLocalShardingItems();if (isFailover) {shardingItems.removeAll(failoverService.getLocalTakeOffItems());}shardingItems.removeAll(executionService.getDisabledItems(shardingItems));return executionContextService.getJobShardingContext(shardingItems);}

    而關鍵的獲取getLocalShardingItems,本地sharding item的相關代碼如下:

    /*** 獲取運行在本作業實例的分片項集合.* * @return 運行在本作業實例的分片項集合*/public List<Integer> getLocalShardingItems() {if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {return Collections.emptyList();}return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());}/*** 判斷作業服務器是否可用.* * @param ip 作業服務器IP地址* @return 作業服務器是否可用*/public boolean isAvailableServer(final String ip) {return isEnableServer(ip) && hasOnlineInstances(ip);}private boolean hasOnlineInstances(final String ip) {for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {if (each.startsWith(ip)) {return true;}}return false;}/*** 判斷服務器是否啟用.** @param ip 作業服務器IP地址* @return 服務器是否啟用*/public boolean isEnableServer(final String ip) {return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)));}

    注意以上的hasOnlineInstances函數及isEnableServer,去zookeeper獲取的相關path里面的狀態被disabled。

    總結

    以上是生活随笔為你收集整理的elasticjob disable JOB的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。