0%

Spring Boot集成Elastic-Job实现分布式任务调度系统

Elastic-Job

Elastic-Job是由当当网基于quartz二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
Elastic-Job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。
开源地址:https://github.com/elasticjob

功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

简介

lastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。

基本概念

分片概念

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

个性化参数的适用场景

个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

核心理念

分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

整体架构图

整体架构图

搭建运维平台

解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。

elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。

登录

提供两种账户,管理员及访客,管理员拥有全部操作权限,访客仅拥有察看权限。默认管理员用户名和密码是root/root,访客用户名和密码是guest/guest,可通过conf\auth.properties修改管理员及访客用户名及密码。

功能列表

  • 登录安全控制
  • 注册中心、事件追踪数据源管理
  • 快捷修改作业设置
  • 作业和服务器维度状态查看
  • 操作作业禁用\启用、停止和删除等生命周期
  • 事件追踪查询

设计理念

运维平台和elastic-job-lite并无直接关系,是通过读取作业注册中心数据展现作业状态,或更新注册中心数据修改全局配置。

控制台只能控制作业本身是否运行,但不能控制作业进程的启动,因为控制台和作业本身服务器是完全分离的,控制台并不能控制作业服务器。

不支持项

加作业 作业在首次运行时将自动添加。Elastic-Job-Lite以jar方式启动,并无作业分发功能。如需完全通过运维平台发布作业,请使用Elastic-Job-Cloud。
注意:依赖注册中心,要先启动zookeeper。
运维平台图

快速开始

引入Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>

<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>

<!-- Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。添加如下依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

配置文件application.properties添加如下配置:

#作业注册中心地址
regCenter.serverList=127.0.0.1:2181
regCenter.namespace=elasticjob-demo

simpleJob.cron=0/5 * * * * ?
simpleJob.shardingTotalCount=1
simpleJob.shardingItemParameters=0=Shanghai

#数据源配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/elastic_job_log?autoReconnect=true&useSSL=false&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&failOverReadOnly=false
spring.datasource.username=root
spring.datasource.password=111111

配置注册中心

新建RegistryCenterConfig类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {

@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}

Spring xml方式配置作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置作业-->
<job:simple id="demoSimpleSpringJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>

将配置Spring命名空间的xml通过Spring启动,作业将自动加载。

基于注解的方式配置作业

新建名为ElasticSimpleJob注解类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {

/**
* 作业名称
*/
String jobName() default "";

/**
* cron表达式,用于控制作业触发时间
*/
@AliasFor("cron")
String value() default "";

@AliasFor("value")
String cron() default "";

/**
* 作业分片总数
*/
int shardingTotalCount() default 1;

/**
* 分片序列号和参数用等号分隔,多个键值对用逗号分隔
* 分片序列号从0开始,不可大于或等于作业分片总数
* 如:
* 0=a,1=b,2=c
*/
String shardingItemParameters() default "";

/**
* 作业自定义参数
* 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
* 例:每次获取的数据量、作业实例从数据库读取的主键等
*/
String jobParameter() default "";

/**
* 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行
*/
boolean failover() default true;

/**
* 是否开启错过任务重新执行
*/
boolean misfire() default true;

/**
* 作业描述信息
*/
String description() default "";

/**
* 监控作业运行时状态
* 每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。
* 每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。
*/
boolean monitorExecution() default true;

/**
* 最大允许的本机与注册中心的时间误差秒数
* 如果时间误差超过配置秒数则作业启动时将抛异常
* 配置为-1表示不校验时间误差
*/
int maxTimeDiffSeconds() default -1;

/**
* 作业监控端口
* 建议配置作业监控端口, 方便开发者dump作业信息。
* 使用方法: echo “dump” | nc 127.0.0.1 9888
*/
int monitorPort() default -1;

/**
* 作业分片策略实现类全路径
* 默认使用平均分配策略
* 如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:
* 如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
* 如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
* 如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
*/
String jobShardingStrategyClass() default "";

/**
* 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复
* 单位:分钟
*/
int reconcileIntervalMinutes() default 10;

/**
* 作业是否禁止启动
* 可用于部署作业时,先禁止启动,部署结束后统一启动
*/
boolean disabled() default false;

/**
* 本地配置是否可覆盖注册中心配置
* 如果可覆盖,每次启动作业都以本地配置为准
*/
boolean overwrite() default false;

/**
* 作业事件追踪的数据源Bean引用
*/
String dataSource() default "";
}

新建JobEventConfig类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import javax.sql.DataSource;

@Configuration
public class JobEventConfig {

@Resource
private DataSource dataSource;

@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}

新建SimpleJobConfig类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.example.elasticjobdemo.job.MySimpleJob;
import org.apache.commons.lang3.StringUtils;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;

@Configuration
public class SimpleJobConfig {

@Resource
private ZookeeperRegistryCenter regCenter;

@Resource
private JobEventConfiguration jobEventConfiguration;

// @Bean
// public SimpleJob simpleJob() {
// return new MySimpleJob();
// }
//
// @Bean(initMethod = "init")
// public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount, @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
// return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
// }
//
// private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
// return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).disabled(true).build();
// }

@Autowired
private ApplicationContext applicationContext;

@PostConstruct
public void initElasticJob() {
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
if (AopUtils.isAopProxy(simpleJob)) {
try {
simpleJob = (SimpleJob) ((Advised) simpleJob).getTargetSource().getTarget();
} catch (Exception var1) {
throw new RuntimeException(var1);
}
}

ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
if (null != elasticSimpleJobAnnotation) {
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
String jobName = StringUtils.isBlank(elasticSimpleJobAnnotation.jobName()) ? simpleJob.getClass().getName() : elasticSimpleJobAnnotation.jobName();
boolean overwrite = elasticSimpleJobAnnotation.overwrite();
boolean monitorExecution = elasticSimpleJobAnnotation.monitorExecution();

//SimpleJob任务配置
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobName, cron, elasticSimpleJobAnnotation.shardingTotalCount())
.shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters())
.description(elasticSimpleJobAnnotation.description())
.failover(elasticSimpleJobAnnotation.failover())
.jobParameter(elasticSimpleJobAnnotation.jobParameter()).build(), simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(overwrite).monitorExecution(monitorExecution).build();

JobListener jobListener = new JobListener();

//配置数据源
String dataSourceRef = elasticSimpleJobAnnotation.dataSource();
if (StringUtils.isNotBlank(dataSourceRef)) {

if (!applicationContext.containsBean(dataSourceRef)) {
throw new RuntimeException("not exist datasource [" + dataSourceRef + "]");
}

DataSource dataSource = (DataSource) applicationContext.getBean(dataSourceRef);
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration, jobListener);
jobScheduler.init();
} else {
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventConfiguration, jobListener);
jobScheduler.init();
}

SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobListener);
jobScheduler.init();
}
}
}
}

新建作业监听器JobListener类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class JobListener implements ElasticJobListener {

private static final Logger logger = LoggerFactory.getLogger(JobListener.class);

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

private long beginTime = 0;

@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
beginTime = System.currentTimeMillis();
logger.info("开始执行 ==> 线程ID:{},作业任务ID:{},作业名称:{},分片总数:{},作业自定义参数:{},分片项和参数:{},作业事件采样统计数:{},当前作业事件采样统计数:{},时间:{}",
Thread.currentThread().getId(),
shardingContexts.getTaskId(),
shardingContexts.getJobName(),
shardingContexts.getShardingTotalCount(),
shardingContexts.getJobParameter(),
shardingContexts.getShardingItemParameters(),
shardingContexts.getJobEventSamplingCount(),
shardingContexts.getCurrentJobEventSamplingCount(),
LocalDateTime.now().format(DATE_TIME_FORMATTER));
}

@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
long endTime = System.currentTimeMillis();
logger.info("执行结束 ==> 线程ID:{},作业任务ID:{},作业名称:{},分片总数:{},作业自定义参数:{},分片项和参数:{},作业事件采样统计数:{},当前作业事件采样统计数:{},时间:{},总耗时:{} 毫秒",
Thread.currentThread().getId(),
shardingContexts.getTaskId(),
shardingContexts.getJobName(),
shardingContexts.getShardingTotalCount(),
shardingContexts.getJobParameter(),
shardingContexts.getShardingItemParameters(),
shardingContexts.getJobEventSamplingCount(),
shardingContexts.getCurrentJobEventSamplingCount(),
LocalDateTime.now().format(DATE_TIME_FORMATTER),
endTime - beginTime);
}
}

新建名为TestSimpleJob任务类

1
2
3
4
5
6
7
8
9
10
11
12
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.example.elasticjobdemo.config.ElasticSimpleJob;

@ElasticSimpleJob(jobName = "TestSimpleJob", cron = "0/20 * * * * ?")
public class TestSimpleJob implements SimpleJob {

@Override
public void execute(ShardingContext shardingContext) {
System.out.println("TestSimpleJob 进来了...");
}
}

到此为止,启动程序,在运维控制台就可以看到TestSimpleJob这个任务了,平台上可以对任务进行一些操作。
运维控制台图

更多关于Elastic-Job的内容,请前往官网:http://elasticjob.io