xxl-job的分片广播
目录
xxl-job的分片广播
场景引入
xxl-job简介
xxl-job的部署安装
代码编写
1.导入依赖
2.yml文件编写
3.编写xxl-job执行器配置类,维护一个xxl-job执行器的bean
4.编写第一个任务,任务名字叫firstJob
5.进入服务端,增加执行器和任务
编辑 6.启动两个服务实例
7.执行任务
8.使用分片广播的任务(重点操作)
xxl-job的分片广播
这里分享以下xxl-job的分片广播的使用
场景引入
在分布式架构下,一个服务往往会部署多个实例来运行我们的业务,如果在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度。
在单体项目中,可以直接使用SpringTask来完成,但如果在集群项目中,不同的项目之间无法知道任务的完成情况。那为什么要使用分布式集群项目,以下是分布式的优点:
分布式系统的特点,并且提高任务的调度处理能力:
- 并行任务调度
-
- 集群部署单个服务,这样就可以多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行(这就是分片广播),来提高任务调度的处理效率。
- 高可用
-
- 若某一个实例宕机,不影响其他实例来执行任务。
- 弹性扩容
-
- 当集群中增加实例就可以提高并执行任务的处理效率。
- 任务管理与监测
-
- 对系统中存在的所有定时任务进行统一的管理及监测。
- 让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
xxl-job简介
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
xxl-job的部署安装
xxl-job想要使用,需要安装admin服务端
这里我们在docker安装
docker run \
-e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.150.101:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=123" \
--restart=always \
-p 28080:8080 \
-v xxl-job-admin-applogs:/data/applogs \
--name xxl-job-admin \
-d \
xuxueli/xxl-job-admin:2.3.0
- 默认端口映射到28080
- 日志挂载到/var/lib/docker/volumes/xxl-job-admin-applogs
- 通过PARAMS环境变量设置数据库链接参数
- 数据库脚本:doc/db/tables_xxl_job.sql · 许雪里/xxl-job - Gitee.com
xxl-job使用的8张表
- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;
代码编写
1.导入依赖
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId></dependency>
2.yml文件编写
application:version: v1.0
spring:application:name: sl-express-xxl-job
server:port: 9901
xxl:job:admin:addresses: http://192.168.150.101:28080/xxl-job-adminexecutor:ip: 192.168.150.1appname: ${spring.application.name}#执行器运行日志文件存储磁盘路径logpath: /data/applogs/xxl-job/jobhandler#执行器日志文件保存天数logretentiondays: 30
这里我们设置了执行器的ip地址,让执行器与xxl-job的服务端同处一个网络下,让服务端可以扫描到这个执行器,这样一来我们就不用自己配置了。
3.编写xxl-job执行器配置类,维护一个xxl-job执行器的bean
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken:}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address:}")private String address;@Value("${xxl.job.executor.ip:}")private String ip;@Value("${xxl.job.executor.port:0}")private int port;@Value("${xxl.job.executor.logpath:}")private String logPath;@Value("${xxl.job.executor.logretentiondays:}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}
4.编写第一个任务,任务名字叫firstJob
@Component
public class JobHandler {private List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5);/*** 普通任务*/@XxlJob("firstJob")public void firstJob() throws Exception {System.out.println("firstJob执行了.... " + LocalDateTime.now());for (Integer data : dataList) {XxlJobHelper.log("data= {}", data);Thread.sleep(RandomUtil.randomInt(100, 500));}System.out.println("firstJob执行结束了.... " + LocalDateTime.now());}
}
5.进入服务端,增加执行器和任务
增加执行器
增加任务,我们这里先使用轮询策略
6.启动两个服务实例
7.执行任务
第一个服务器把所有任务都执行了
再执行一次,第二个服务器又把所有任务执行了
这样一来我们可以发现并没有把分布式的优势利用上,如果任务特别多,都是只让一个服务器(执行器)执行任务 ,效率十分低下,所以我们需要使用分片广播
8.使用分片广播的任务(重点操作)
/*** 分片式任务*/@XxlJob("shardingJob")public void shardingJob() throws Exception {// 分片参数// 分片节点总数int shardTotal = XxlJobHelper.getShardTotal();// 当前节点下标,从0开始int shardIndex = XxlJobHelper.getShardIndex();System.out.println("shardingJob执行了.... " + LocalDateTime.now());for (Integer data : dataList) {if (data % shardTotal == shardIndex) {System.out.println("data= {}"+ data);Thread.sleep(RandomUtil.randomInt(100, 500));}}System.out.println("shardingJob执行结束了.... " + LocalDateTime.now());}
如何把任务分片给不同的服务器(执行器)呢,xxl-job给了两个方法
// 分片节点总数
int shardTotal = XxlJobHelper.getShardTotal();
// 当前节点下标,从0开始
int shardIndex = XxlJobHelper.getShardIndex();这两个方法分别获取了执行器的总数量,和当前执行器的下标,然后我们可以让任务的某一个唯一参数对总数量进行取模,最后让对应的执行器执行任务,这样一来就完成了任务的分片执行
任务选择分片广播
执行一次,任务成功完成分片