整合XXL-Job任务调度平台
- 创建数据库
tables_xxl_job.sql
- 引入依赖
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version>
</dependency>
- 编写配置文件
server:port: 8081xxl:job:admin:# 这个地址是xxl-job客户端的启动地址addresses: http://127.0.0.1:8080/xxl-job-admin# 这个accessToken要和客户端的token保持一致accessToken: tokenexecutor:# appname是在客户端建立的执行器名称appname: xxl-job-executoraddress: ""ip: ""port: 9999logpath: ./data/logs/xxl-job/executorlogretentiondays: 30
accessToken
.appname
: 与客户端里的保持一致
- 在
resource
目录下添加logback.xml
文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds"><contextName>logback</contextName><property name="log.path" value="/data/applogs/xxl-job/xxl-job-admin.log"/><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${log.path}</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern></rollingPolicy><encoder><pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern></encoder></appender><root level="info"><appender-ref ref="console"/><appender-ref ref="file"/></root></configuration>
- 编写配置类
package com.example.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Ccoo* 2024/5/16*/
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appName;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}
- 注册Job
package com.example.job;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDate;/*** @author Ccoo* 2024/5/16*/
@Component
public class MyJobHandler {private Logger log = LoggerFactory.getLogger(MyJobHandler.class);@XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")public ReturnT<String> execute(String param) {log.info("小滴课堂execute任务触发成功:" + LocalDate.now());return ReturnT.SUCCESS;}private void init() {log.info("init方法调用成功");}private void destroy () {log.info("destroy方法调用成功");}
}
- 客户端注册调度任务
查看客户端是否注册成功:
部署调度中心集群
- 多节点部署执行器
修改idea的相关配置
修改yml配置文件对应的端口号
启动多份实例, 部署多个节点
查看是否注册成功
- 实现任务调度策略
选择合适的路由策略可以达到负载均衡的效果
LVS + KeepAlive
- 海量数据分片处理
@XxlJob(value = "shardingJobHandler")
public void shardingJobHandler(){// 当前执行器编号int shardIndex = XxlJobHelper.getShardIndex();// 总的分片数,就是执行器的集群数量int shardTotal = XxlJobHelper.getShardTotal();log.info("分片总数:{},当前分片数:{}" ,shardTotal ,shardIndex);List<Integer> allUserIds = getAllUserIds();allUserIds.forEach(obj->{if(obj % shardTotal == shardIndex){log.info("第{}片,命中分片开始处理用户id={}",shardIndex,obj);}});
}