Elastic Job入门(1) - 简介
1、如果業務工程采用集群化的部署,可能會多次重復執行定時任務而導致系統的業務邏輯錯誤,并產生系統故障。
2、Quartz的集群方案具備HA功能,可以實現定時任務的分發,但是通過增加機器節點數量的方式并不能提高每次定時任務的執行效率,無法實現任務的彈性分片。
開源產品Elastic-Job是當當開源的一款分布式彈性定時任務調度框架,在2.X版本以后主要分為Elastic-Job-Lite和Elastic-Job-Cloud兩個子項目。其中,Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。而Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。本文主要學習Elastic-Job-Lite。
GitHub地址為:https://github.com/elasticjob/elastic-job-lite
中文官網為:http://elasticjob.io/index_zh.html
從Elastic-Job的架構圖上基本就可以看出,其以Jar的形式為業務工程(諸如,Spring Boot工程)的快速集成提供了簡便的方式。同時,其提供的定時任務分片、彈性擴縮容、失效轉移、作業監控和支持多種作業模式等強大的功能,使業務開發人員無需在這些方面花費較大多的精力,而可以更加專注于平臺的業務開發。其主要的功能如下:
1、定時任務:基于成熟的定時任務作業框架Quartz cron表達式執行定時任務;
2、作業注冊中心:基于Zookeeper和其客戶端Curator實現的全局作業注冊控制中心;作業注冊中心僅用于作業任務注冊和監控信息的暫存;
3、定時任務分片:可以將原本一個較大任務分片成為多小的子任務項分別在多個服務器上同時執行,提高總任務的執行處理效率;
4、彈性擴容縮容:運行中定時任務所在的服務器崩潰,或新增加n臺作業服務器,作業框架將在下次任務執行前重新進行任務調度分發,不影響當前任務的處理與執行;
5、支持多種任務模式:分別支持Simple、Dataflow和Script類型的定時任務;
6、失效轉移:運行中的定時任務所在的服務器崩潰不會導致重新分片,會在下次定時任務啟動時重新分發和調度;
7、運行時定時任務狀態收集:監控任務運行時的狀態,統計最近一段時間任務處理成功和失敗的數量,記錄作業上次運行開始時間,結束時間和下次運行時間;
8、支持配置定時任務停止、恢復和禁用:用于操作定時任務的啟停,并可以禁止某任務的執行;
9、Spring支持:Elastic-Job-Lite項目完美支持spring的容器,自定義命名空間,支持占位符
10、運維平臺:提供運維界面,方便開發和運維人員管理生產環境上已經發布的定時任務和注冊中心; 水平分片 橫向拓容 Springboot2默認數據庫連接池選擇了HikariCP <dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId> </dependency> spring:datasource:url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=falsedriver-class-name: com.mysql.jdbc.Driverusername: rootpassword: roottype: com.zaxxer.hikari.HikariDataSource# 自動創建更新驗證數據庫結構jpa:hibernate:ddl-auto: updateshow-sql: truedatabase: mysql
?
==========================================API配置啟動==========================================
引入Maven
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>${latest.release.version}</version> </dependency>?
方法參數ShardingContext包含作業配置,分片和運行時信息。可通過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數,運行在本作業服務器的分片序列號集合等。
Simple類型作業
與Quartz接口相似,用于執行普通的定時任務,只是增加了彈性拓容和分片等功能:
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ... }} }?
Dataflow類型作業
Dataflow類型用于處理數據流,需實現DataflowJob接口。該接口提供2個方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數據。
public class MyElasticJob implements DataflowJob<Foo> {@Overridepublic List<Foo> fetchData(ShardingContext context) {switch (context.getShardingItem()) {case 0: List<Foo> data = // get data from database by sharding item 0return data;case 1: List<Foo> data = // get data from database by sharding item 1return data;case 2: List<Foo> data = // get data from database by sharding item 2return data;// case n: ... }}@Overridepublic void processData(ShardingContext shardingContext, List<Foo> data) {// process data// ... } }流式處理
可通過DataflowJobConfiguration配置是否流式處理。流式處理數據只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直運行下去; 非流式處理數據則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。如果采用流式作業處理方式,建議processData處理數據后更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。 流式數據處理參照TbSchedule設計,適用于不間歇的數據處理。
?
作業配置
Elastic-Job配置分為3個層級,分別是Core, Type和Root。每個層級使用相似于裝飾者模式的方式裝配。
Core對應JobCoreConfiguration,用于提供作業核心配置信息,如:作業名稱、分片總數、CRON表達式等。
Type對應JobTypeConfiguration,有3個子類分別對應SIMPLE, DATAFLOW和SCRIPT類型作業,提供3種作業需要的不同配置,如:DATAFLOW類型是否流式處理或SCRIPT類型的命令行等。
Root對應JobRootConfiguration,有2個子類分別對應Lite和Cloud部署類型,提供不同部署類型所需的配置,如:Lite類型的是否需要覆蓋本地配置或Cloud占用CPU或Memory數量等。
// 定義作業核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName()); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();?
作業啟動
public class JobDemo {public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {// 創建作業配置// ... } }?
==========================================Spring配置啟動==========================================
引入Maven
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>${latest.release.version}</version> </dependency>?
作業配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.dangdang.com/schema/ddframe/reghttp://www.dangdang.com/schema/ddframe/reg/reg.xsdhttp://www.dangdang.com/schema/ddframe/jobhttp://www.dangdang.com/schema/ddframe/job/job.xsd"><!--配置作業注冊中心 --><reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /><!-- 配置作業--><job:simple id="demoSimpleSpringJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>?
作業啟動
將配置Spring命名空間的xml通過Spring啟動,作業將自動加載。
轉載于:https://www.cnblogs.com/ijavanese/p/9956094.html
總結
以上是生活随笔為你收集整理的Elastic Job入门(1) - 简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [转]使用HttpOnly提升Cooki
- 下一篇: 「PKUWC2018」随机游走