当前位置: 首页 > news >正文

SpringCloud系列 - xxl-job 分布式任务调度 (七)

目录

一、基本概念

1.1 什么是任务调度

1.2 任务调度工具​

(1)Timer与TimerTask​

(2) ScheduledExecutorService​

(3)​​Spring TaskScheduler​

1.3 为什么要使用分布式调度?

(1)避免重复执行

(2)突破单机处理极限

(3)高可用与容错

(4)弹性扩展能力​​

(5)负载均衡优化

 (6)定时任务集中管理​

(7)任务分片

二、xxl-job

2.1 介绍

2.2 官方文档

2.3 工作原理

2.4 使用示例

(1)拉代码

(2)建数据库

(3)调度中心部署

1)改配置文件

​编辑2)启动调度中心

3)Docker部署(可选)

(4)创建自己的项目

1) 导入依赖

2)创建配置类

3)配置文件

4)任务开发与管理

 5)启动项目

(5)管理后台配置

1)执行器管理​​

2)任务管理

三、高级特性与最佳实践

3.1 集群与高可用

(1)模拟执行器集群部署

3.2 路由策略

3.3 任务分片

3.4 任务日志

四、问题:任务我都停止了,为什么还在执行?


一、基本概念

1.1 什么是任务调度

​任务调度​​(Task Scheduling)是指通过编程手段,按照预定的时间规则(如固定时间点、时间间隔或特定条件)自动执行任务(如方法、函数或作业)的机制。其核心目标是实现任务的自动化管理,提升系统效率、资源利用率和响应速度。

1.2 任务调度工具​

Java提供了多种实现任务调度的方式,适用于不同复杂度的需求:

(1)Timer与TimerTask​

Timer底层是使用一个单线来实现多个Timer任务处理的,所有任务都是由同一个线程来调度,所有任务都是串行执行,意味着同一时间只能有一个任务得到执行,而前一个任务的延迟或者异常会影响到之后的任务。

如果有一个定时任务在运行时,产生未处理的异常,那么当前这个线程就会停止,那么所有的定时任务都会停止,受到影响。

  • ​特点​​:Java早期提供的简单调度工具,单线程串行执行任务。
  • 缺点​​:任务延迟或异常会影响后续任务,且不支持并发。

因此,我们就没有必要学习了,几乎不用!

(2) ScheduledExecutorService

  • 特点​​:基于线程池的并发调度,支持多任务并行执行,避免Timer的单线程缺陷。
  • 核心方法​​:
    • scheduleAtFixedRate:固定频率执行(基于任务开始时间)。
    • scheduleWithFixedDelay:固定延迟执行(基于任务结束时间)
  • 示例:
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    scheduler.scheduleAtFixedRate(() -> System.out.println("任务执行"), 0, 3, TimeUnit.SECONDS)[1,3](@ref)。

(3)​​Spring TaskScheduler​

Spring生态的调度工具,支持注解(如@Scheduled)和Cron表达式,集成简便。

这个相信是绝大多数Java程序员都熟悉不过的任务调度工具了。举个例子:

@Scheduled(fixedRate = 5000)
public void task() {System.out.println("每5秒执行一次");
}

当然了,任务调度工具还有很多,比如xxl-job、elastic-job、​Quartz等等。我们后面慢慢讲。

1.3 为什么要使用分布式调度?

(1)避免重复执行

在单体项目单实例运行时,我们完全可以使用​​Spring TaskScheduler​这种任务调度即可。但是一旦多实例运行,那么这些任务就会重复运行,重复运行是可能产生负面后果的(如重复发放优惠券)。

那么这种情况下,我们也可以使用分布式锁结合必要的代码逻辑来保障不重复运行。

当然再有就是使用分布式调度。分布式调度其实也是通过锁机制或主节点选举确保同一任务只执行一次。使用分布式调度框架的好处就是不用自己去写复杂的实现过程。

(2)突破单机处理极限

当任务量超过单台机器的处理能力时(如从每分钟处理1万订单增长到10万订单),单机即使采用多线程也无法满足需求,分布式调度可以通过多台机器协同处理。

单机的CPU、内存和磁盘资源有限,分布式系统能够聚合多台机器的计算资源。

(3)高可用与容错

避免单点故障,节点故障时任务可自动转移到其他节点,保障业务连续性。

(4)弹性扩展能力​

分布式系统可以根据任务负载动态增减计算节点,实现资源的弹性伸缩,这是单机系统无法实现的。

(5)负载均衡优化

通过智能调度算法(如最小负载、轮询等),分布式系统可以将任务均匀分配到各节点,避免某些节点过载而其他节点闲置。

 (6)定时任务集中管理​

在微服务架构中,各服务可能有自己的定时任务(如数据同步、报表生成),分布式调度可以统一管理和监控这些任务。

(7)任务分片

将大任务拆分为多个子任务并行执行,提升处理效率(如XXL-JOB的分片参数功能)

总之

分布式任务调度是指在分布式计算环境下,通过协调多个节点(服务实例)对任务进行动态分配、执行和监控的过程,旨在提高系统吞吐量、可用性和资源利用率。其核心是通过分布式架构解决传统单机调度在性能、容错和扩展性上的瓶颈。

提一嘴:分布式任务调度不仅仅是分布式架构(包含微服务)可以用,集群架构也可以用。甚至是单机运行也可以。

二、xxl-job

2.1 介绍

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。如下内容来源于https://gitee.com/xuxueli0323/xxl-job

支持如下特性:

1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;

2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;

3、调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;

4、执行器HA(分布式):任务分布式执行,任务"执行器"支持集群部署,可保证任务执行HA;

5、注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;

6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

7、触发策略:提供丰富的任务触发策略,包括:Cron触发、固定间隔触发、固定延时触发、API(事件)触发、人工触发、父子任务触发;

8、调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;

9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;

10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;

11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;

12、任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;

13、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;

14、分片广播任务:执行器集群部署时,任务路由策略选择"分片广播"情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;

15、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

16、故障转移:任务路由策略选择"故障转移"情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。

17、任务进度监控:支持实时监控任务进度;

18、Rolling实时日志:支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志;

19、GLUE:提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。

20、脚本任务:支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS、PHP、PowerShell等类型脚本;

21、命令行任务:原生提供通用命令行任务Handler(Bean任务,"CommandJobHandler");业务方只需要提供命令行即可;

22、任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔;

23、一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行;

24、自定义任务参数:支持在线配置调度任务入参,即时生效;

25、调度线程池:调度系统多线程触发调度运行,确保调度精确执行,不被堵塞;

26、数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性;

27、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件;

28、推送maven中央仓库: 将会把最新稳定版推送到maven中央仓库, 方便用户接入和使用;

29、运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等;

30、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行;

31、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式”和“httpJobHandler”等其他跨语言方案;

32、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;

33、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;

34、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;

35、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色;

36、权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;

2.2 官方文档

本教程从官网找到了相关文档,供各位学习使用,也可以自行浏览下载。后面的讲解会结合文档进行学习。

2.3 工作原理

XXL-JOB分为两大模块

​调度中心(调度模块)​

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。

​执行器(执行模块)​

负责接收调度请求并执行任务逻辑。

2.4 使用示例

(1)拉代码

从官网拉取代码到本地idea中。

(2)建数据库

执行/xxl-job/doc/db/tables_xxl_job.sql脚本初始化数据库,会创建以下主要表:

  • xxl_job_group:执行器信息表
  • xxl_job_info:调度扩展信息表
  • xxl_job_log:调度日志表
  • xxl_job_lock:任务调度锁表

(3)调度中心部署

1)改配置文件

修改xxl-job-admin/src/main/resources/application.properties配置文件,主要是自己的数据库连接信息。

### 调度中心JDBC链接
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=your_password### 报警邮箱(可选)
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=邮箱授权码### 调度中心通讯TOKEN(可选)
xxl.job.accessToken=your_token
2)启动调度中心

直接运行XxlJobAdminApplication的main方法

或打包成jar后执行

java -jar xxl-job-admin-2.4.0.jar

启动成功后访问 http://localhost:8080/xxl-job-admin/,默认账号密码:admin/123456

如果不对,可以自己看源码,有可能版本不一样账号密码也不一样。

 

3)Docker部署(可选)

对于Linux环境,可以使用Docker部署:

# 拉取镜像
docker pull xuxueli/xxl-job-admin:2.4.0# 运行容器
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://mysql:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=root --spring.datasource.password=123456" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:2.4.0

(4)创建自己的项目

这里我们以使用xxl-job-executor-sample-springboot这个样例来演示

1) 导入依赖
        <!-- xxl-job-core --><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.5.0</version></dependency>
2)创建配置类
/*** xxl-job config** @author xuxueli 2017-04-28*/
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.admin.accessToken}")private String accessToken;@Value("${xxl.job.admin.timeout}")private int timeout;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setTimeout(timeout);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;**      1、引入依赖:*          <dependency>*             <groupId>org.springframework.cloud</groupId>*             <artifactId>spring-cloud-commons</artifactId>*             <version>${version}</version>*         </dependency>**      2、配置文件,或者容器启动变量*          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'**      3、获取IP*          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
3)配置文件

配置application.properties,加入配置类依赖的属性

# web port
server.port=8081
# no web
#spring.main.web-environment=false# log config
logging.config=classpath:logback.xml### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job, access token
xxl.job.admin.accessToken=default_token
### xxl-job timeout by second, default 3s
xxl.job.admin.timeout=3### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
4)任务开发与管理

 XXL-JOB支持两种任务开发方式:

 Bean模式(基于方法)

@Component
public class SampleXxlJob {@XxlJob("demoJobHandler")public void demoJobHandler() throws Exception {XxlJobHelper.log("XXL-JOB, Hello World.");System.out.println("执行任务逻辑...");}
}

 继承IJobHandler

@Component
public class DemoGlueJobHandler extends IJobHandler {@Overridepublic void execute() throws Exception {XxlJobLogger.log("XXL-JOB, Hello World.");System.out.println("执行任务逻辑...");}
}

 为了演示效果更好,我们可以参考它给的样例,自己写几个定时任务。

@Component
public class HssyXxlJob {@XxlJob("test1JobHandler")public void test1() throws Exception {for (int i = 0; i < 5; i++) {System.out.println("test1:" + i);TimeUnit.SECONDS.sleep(1);}System.out.println("【测试方法 1】 --- 执行完毕!");}@XxlJob("test2JobHandler")public void test2() throws Exception {for (int i = 0; i < 5; i++) {System.out.println("test2:" + i);TimeUnit.SECONDS.sleep(1);}System.out.println("【测试方法 2】 --- 执行完毕!");}@XxlJob("test3JobHandler")public void test3() throws Exception {for (int i = 0; i < 5; i++) {System.out.println("test3:" + i);TimeUnit.SECONDS.sleep(1);}System.out.println("【测试方法 3】 --- 执行完毕!");}
}

 5)启动项目

(5)管理后台配置

1)执行器管理​

添加执行器,AppName需与配置文件中一致。

所谓执行器,我们每个项目都可以叫做一个执行器。

2)任务管理

新增任务管理,主要配置项包括:

  • 任务描述
  • 路由策略(第一个、轮询、随机等)
  • Cron表达式
  • 运行模式(BEAN模式需填写JobHandler名称)
  • 任务参数等

 

三、高级特性与最佳实践

3.1 集群与高可用

​调度中心集群​​:部署多个调度中心实例,使用同一数据库

​执行器集群​​:执行器支持集群部署,调度中心会自动发现

(1)模拟执行器集群部署

执行器集群部署的核心在于多个执行器实例使用相同的appname,这样它们会被视为同一个执行器集群。

多个执行器实例(可以是不同服务器或同一服务器的不同端口)

第一台执行器配置

在application.properties中配置如下内容

# 项目端口
server.port=8081
server.servlet.context-path=/xxl-job-executor-sample# 调度中心地址
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin# 执行器配置
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30

第二台执行器配置

在同一网络或不同服务器上配置第二个执行器实例:

# 项目端口(必须不同)
server.port=8082# 调度中心地址(与第一台相同)
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin# 执行器配置(appname相同,端口不同)
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.port=9998
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30

关键点​​:集群中的执行器必须使用相同的appname但不同的执行器端口,这样它们会被识别为同一集群的不同节点 

ok,基本要求是appname必须相同但是执行器的端口不能相同。我准备在本地模拟,所以服务的端口也应该不同。那么我们在idea中复制一份配置修改再运行。

分别启动两个执行器项目:

  • 第一台执行器:端口8081,执行器端口9999
  • 第二台执行器:端口8082,执行器端口9998

创建测试任务

在调度中心创建新任务:

  • 执行器:选择刚才配置的执行器
  • 任务描述:自定义
  • 路由策略:选择需要的策略(如轮询、随机等)
  • Cron表达式:设置调度频率
  • JobHandler:填写test1JobHandler(与代码中@XxlJob注解值一致)

这一步其实前面演示的时候已经做好了,这里就跳过。

路由策略演示

XXL-JOB提供了多种路由策略,适用于不同场景:

  1. ​轮询:依次调用集群中的每个执行器

    效果:任务会在9999和9998端口交替执行
  2. ​随机​​:随机选择集群中的一个执行器

    效果:任务随机在某一台执行器上执行
  3. ​故障转移:按顺序检测执行器可用性,选择第一个可用的执行器

    效果:如果9999端口执行器宕机,会自动转移到9998端口
  4. ​忙碌转移​​:检测执行器是否忙碌,选择空闲的执行器

    效果:当一台执行器任务积压时,会将新任务分配给空闲执行器
  5. ​分片广播​​:所有执行器同时执行,每个执行器处理部分数据

    效果:所有执行器同时执行任务,可通过分片参数区分处理数据

这样我们多个示例都能运行啦。 

3.2 路由策略

XXL-JOB提供丰富的路由策略:

  • 第一个、最后一个
  • 轮询、随机
  • 一致性HASH
  • 故障转移、忙碌转移等

前面模拟执行器集群部署已经说明过了,可以自行演示。

3.3 任务分片

支持任务分片处理,适用于大数据量处理场景:

@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();// 业务逻辑List<Long> allItems = findAllItems();for(int i = 0; i < allItems.size(); i++) {if(i % shardTotal == shardIndex) {processItem(allItems.get(i));}}
}

3.4 任务日志

  • 执行日志会持久化到数据库
  • 支持日志报表和日志回溯
  • 可通过XxlJobHelper.log()记录业务日志

四、问题:任务我都停止了,为什么还在执行?

我们看代码,发现这个方法的执行时间大概5s,但是定时任务的周期是3秒和1s。

本次任务时间到了,但是上一次的任务没有执行完,这次就堆积了起来。虽然看上去调度中心已经关闭了,但是堆积的任务还必须执行完。

这一点和spring的定时任务不太一样,通过定时任务线程池配置的方式,执行完成后,需要等到下一次触发时间到了才会执行,并不会堆积。具体可以看看我之前写的文章。

这一点不知道其他版本会不会有变化,需要注意。

如果不希望堆积执行的话,怎么办呢? 

这其实是阻塞处理策略我们默认的是单机串行。

尝试给它改成丢弃后续调度,就可以了。

http://www.lryc.cn/news/583099.html

相关文章:

  • Docker高级管理
  • Wireshark抓包实验之TCP连接
  • 使用 Docker Compose 简化 INFINI Console 与 Easysearch 环境搭建
  • 数据管理新范式:基于Docker的私有云存储系统构建指南
  • 7.9 note| dfs
  • 【Linux】Rocky Linux 安装 Docker 与 Docker-Compose
  • 【vLLM 学习】Eagle
  • 多代理混战?用 PAC(Proxy Auto-Config) 优雅切换代理场景
  • 选哪个数据恢复软件?六款深度数据恢复软件介绍
  • 数据基础练习
  • 【Linux】权限的概念及理解
  • 进程于线程-3
  • 代码审计-springel表达式注入
  • JSP动态网页开发基础
  • 前后端集合如何传递
  • 主流大模型Agent框架 AutoGPT详解
  • thinkphp使用redis抢单实例
  • 如何将华为手机中的照片传输到电脑
  • 超越公有云:在裸金属服务器上构建低成本、高性能的静态资源服务
  • 【RK3568+PG2L50H开发板实验例程】FPGA部分 | Pango 的时钟资源——锁相环
  • 川翔云电脑:突破硬件极限,重构设计生产力范式
  • 使用DDR4控制器实现多通道数据读写(十九)
  • Amazon S3 对象存储服务深度解析:存储原理、应用场景与实战指南
  • 1.1 ARMv8/ARMv9安全扩展
  • ReactNative【实战】轮播图(含组件封装 ImageSlider)
  • 洛谷P1044 栈(学习向)
  • react16-react19都更新哪些内容?
  • clickhouse 各个引擎适用的场景
  • 【TCP/IP】2. 计算机网络与因特网体系结构
  • 手机文件夹隐藏工具,一键保护隐私