当前位置: 首页 > news >正文

xxl-job的分片广播

目录

xxl-job的分片广播

场景引入

xxl-job简介

xxl-job的部署安装

代码编写

 1.导入依赖

2.yml文件编写

3.编写xxl-job执行器配置类,维护一个xxl-job执行器的bean

4.编写第一个任务,任务名字叫firstJob

5.进入服务端,增加执行器和任务

​编辑 6.启动两个服务实例

7.执行任务

8.使用分片广播的任务(重点操作)


xxl-job的分片广播

这里分享以下xxl-job的分片广播的使用

场景引入

在分布式架构下,一个服务往往会部署多个实例来运行我们的业务,如果在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度

在单体项目中,可以直接使用SpringTask来完成,但如果在集群项目中,不同的项目之间无法知道任务的完成情况。那为什么要使用分布式集群项目,以下是分布式的优点:

分布式系统的特点,并且提高任务的调度处理能力:

  • 并行任务调度
    • 集群部署单个服务,这样就可以多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行(这就是分片广播),来提高任务调度的处理效率。
  • 高可用
    • 若某一个实例宕机,不影响其他实例来执行任务。
  • 弹性扩容
    • 当集群中增加实例就可以提高并执行任务的处理效率。
  • 任务管理与监测
    • 对系统中存在的所有定时任务进行统一的管理及监测。
    • 让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。

xxl-job简介

 XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

xxl-job的部署安装

xxl-job想要使用,需要安装admin服务端

 这里我们在docker安装

docker run \
-e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.150.101:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=123" \
--restart=always \
-p 28080:8080 \
-v xxl-job-admin-applogs:/data/applogs \
--name xxl-job-admin \
-d \
xuxueli/xxl-job-admin:2.3.0

  • 默认端口映射到28080
  • 日志挂载到/var/lib/docker/volumes/xxl-job-admin-applogs
  • 通过PARAMS环境变量设置数据库链接参数
  • 数据库脚本:doc/db/tables_xxl_job.sql · 许雪里/xxl-job - Gitee.com

 

xxl-job使用的8张表

 

  • xxl_job_lock:任务调度锁表;
  • xxl_job_group:执行器信息表,维护任务执行器信息;
  • xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
  • xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
  • xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
  • xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
  • xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
  • xxl_job_user:系统用户表;

代码编写

 1.导入依赖

   <dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId></dependency>

2.yml文件编写

application:version: v1.0
spring:application:name: sl-express-xxl-job
server:port: 9901
xxl:job:admin:addresses: http://192.168.150.101:28080/xxl-job-adminexecutor:ip: 192.168.150.1appname: ${spring.application.name}#执行器运行日志文件存储磁盘路径logpath: /data/applogs/xxl-job/jobhandler#执行器日志文件保存天数logretentiondays: 30

这里我们设置了执行器的ip地址,让执行器与xxl-job的服务端同处一个网络下,让服务端可以扫描到这个执行器,这样一来我们就不用自己配置了。 

3.编写xxl-job执行器配置类,维护一个xxl-job执行器的bean

@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken:}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address:}")private String address;@Value("${xxl.job.executor.ip:}")private String ip;@Value("${xxl.job.executor.port:0}")private int port;@Value("${xxl.job.executor.logpath:}")private String logPath;@Value("${xxl.job.executor.logretentiondays:}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}

4.编写第一个任务,任务名字叫firstJob

@Component
public class JobHandler {private List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5);/*** 普通任务*/@XxlJob("firstJob")public void firstJob() throws Exception {System.out.println("firstJob执行了.... " + LocalDateTime.now());for (Integer data : dataList) {XxlJobHelper.log("data= {}", data);Thread.sleep(RandomUtil.randomInt(100, 500));}System.out.println("firstJob执行结束了.... " + LocalDateTime.now());}
}

5.进入服务端,增加执行器和任务

增加执行器

增加任务,我们这里先使用轮询策略

 6.启动两个服务实例

 

7.执行任务

第一个服务器把所有任务都执行了

 再执行一次,第二个服务器又把所有任务执行了

这样一来我们可以发现并没有把分布式的优势利用上,如果任务特别多,都是只让一个服务器(执行器)执行任务 ,效率十分低下,所以我们需要使用分片广播

8.使用分片广播的任务(重点操作)

    /*** 分片式任务*/@XxlJob("shardingJob")public void shardingJob() throws Exception {// 分片参数// 分片节点总数int shardTotal = XxlJobHelper.getShardTotal();// 当前节点下标,从0开始int shardIndex = XxlJobHelper.getShardIndex();System.out.println("shardingJob执行了.... " + LocalDateTime.now());for (Integer data : dataList) {if (data % shardTotal == shardIndex) {System.out.println("data= {}"+ data);Thread.sleep(RandomUtil.randomInt(100, 500));}}System.out.println("shardingJob执行结束了.... " + LocalDateTime.now());}

 如何把任务分片给不同的服务器(执行器)呢,xxl-job给了两个方法

// 分片节点总数
        int shardTotal = XxlJobHelper.getShardTotal();
        // 当前节点下标,从0开始
        int shardIndex = XxlJobHelper.getShardIndex();

这两个方法分别获取了执行器的总数量,和当前执行器的下标,然后我们可以让任务的某一个唯一参数对总数量进行取模,最后让对应的执行器执行任务,这样一来就完成了任务的分片执行

任务选择分片广播 

执行一次,任务成功完成分片

 

http://www.lryc.cn/news/534488.html

相关文章:

  • MobaXterm破解会话上限限制
  • vscode设置保存时自动缩进和格式化
  • 一键查看电脑各硬件详细信息 轻松查看电脑硬件参数
  • 【C++11】lambda和包装器
  • react redux用法学习
  • 前端HTML标签 meta中常见的一些属性
  • 127,【3】 buuctf [NPUCTF2020]ReadlezPHP
  • 继承(python)
  • 驱动开发系列36 - Linux Graphics 2D 绘制流程
  • STL函数算法笔记
  • 【Vue】在Vue3中使用Echarts的示例 两种方法
  • 小红书自动化:如何利用Make批量生成爆款笔记
  • 学习率调整策略 | PyTorch 深度学习实战
  • DeepSeekMoE 论文解读:混合专家架构的效能革新者
  • 以下是基于巨控GRM241Q-4I4D4QHE模块的液位远程控制系统技术方案:
  • 【JVM详解五】JVM性能调优
  • 2.10日学习总结
  • 疯狂前端面试题(四)
  • YOLOv11-ultralytics-8.3.67部分代码阅读笔记-metrics.py
  • SuperCopy解除网页禁用复制功能插件安装和使用
  • UP-VLA:具身智体的统一理解与预测模型
  • Unity 基于状态机的逻辑控制详解
  • 傅里叶单像素成像技术研究进展
  • IDEA接入DeepSeek
  • 前端如何判断浏览器 AdBlock/AdBlock Plus(最新版)广告屏蔽插件已开启拦截
  • macOS 上部署 RAGFlow
  • 如何在Kickstart自动化安装完成后ISO内拷贝文件到新系统或者执行命令
  • 在服务器部署JVM后,如何评估JVM的工作能力,比如吞吐量
  • 攻防世界32 very_easy_sql【SSRF/SQL时间盲注】
  • STM32G474--Whetstone程序移植(双精度)笔记