SpringBoot实现动态Job实战
功能说明
本系统实现了一个动态 Job 调度框架,支持通过数据库配置 Job 信息(如执行周期、目标方法等),并能实时监控 Job 的增删改状态,无需重启应用即可动态调整 Job 的执行策略。核心功能包括:
- Job 全生命周期管理:支持创建、更新、删除 Job,以及启动 / 停止 Job 执行
- 动态感知配置变化:每隔 5 秒自动检测数据库中 Job 配置的变更,并同步更新执行状态
- 基于 Cron 表达式的调度:通过标准 Cron 表达式定义 Job 执行周期,满足复杂调度需求
技术栈选型
- 核心框架:Spring Boot(提供依赖注入、启动回调等基础能力)
- 持久层:MyBatis-Plus(简化数据库操作,支持 CRUD 及条件查询)
- 任务调度:ThreadPoolTaskScheduler(Spring 内置的任务调度组件,支持 Cron 触发)
- 数据库:关系型数据库(通过 Job 表存储配置信息)
- 工具类:Hutool(提供 ID 生成、集合处理等工具方法)
- 表达式解析:Spring 的 CronExpression(校验 Cron 表达式合法性)
数据库设计与初始化
1. Job 表结构设计
-- 创建Job表(存储Job配置信息)create table if not exists t_job(id varchar(50) primary key comment '主键ID',name varchar(100) not null comment 'Job名称(具有业务含义的标识)',cron varchar(50) not null comment '执行周期(Cron表达式)',bean_name varchar(100) not null comment '目标Bean名称(Spring容器中的Bean标识)',bean_method varchar(100) not null comment '目标方法名称(Bean中需要执行的方法)',status smallint not null default 0 comment '执行状态:0-停止,1-运行中');
2. 测试数据初始化
-- 清理历史数据(测试环境使用)delete from t_job;-- 插入测试Job:-- job1:每1秒执行一次-- job2:每2秒执行一次insert ignore into t_job values('1', '测试Job1', '* * * * * *', 'job1', 'execute', 1),('2', '测试Job2', '*/2 * * * * *', 'job2', 'execute', 1);
实现原理
系统启动后,通过以下流程实现动态 Job 调度:
- 初始化阶段:应用启动时读取数据库中状态为 “运行中” 的 Job,通过 ThreadPoolTaskScheduler 初始化调度任务
- 监控阶段:每 5 秒执行一次数据库扫描,对比内存中运行的 Job 与数据库配置的差异
- 同步阶段:根据差异动态调整 Job 执行状态(新增 Job 启动执行、删除 Job 停止调度、修改 Job 重新加载配置)
核心代码解析
1. 数据模型与 DTO
(1)数据库实体(JobPO)
package com.practical.po;import com.baomidou.mybatisplus.annotation.TableName;import lombok.Data;/*** Job数据库实体类* 对应表t_job,存储Job的配置信息** @author chen* @version 1.0* @date 2025-07-08 17:37*/@TableName("t_job")@Datapublic class JobPO {// Job唯一标识private String id;// Job名称(用于可视化展示)private String name;// 执行周期的Cron表达式private String cron;// 目标Bean在Spring容器中的名称private String beanName;// 目标Bean中需要执行的方法名private String beanMethod;// 执行状态(0-停止,1-运行中)private Integer status;}
(2)数据传输对象(DTO)
package com.practical.dto;import com.practical.po.JobPO;import lombok.Data;import org.springframework.beans.BeanUtils;/*** Job数据传输对象* 用于服务层与控制层之间的数据交互** @author chen* @version 1.0* @date 2025-07-08 16:57*/@Datapublic class Job {private String id;private String name;private String cron;private String beanName;private String beanMethod;/*** 从数据库实体转换为DTO*/public static Job of(JobPO jobPO) {if (jobPO == null) return null;Job job = new Job();BeanUtils.copyProperties(jobPO, job);return job;}}
2. 核心服务实现
(1)Job 服务接口(JobService)
package com.practical.service;import com.practical.dto.Job;import com.practical.dto.JobCreateRequest;import com.practical.dto.JobUpdateRequest;import java.util.List;/*** Job服务接口* 定义Job的CRUD及状态管理操作** @author chen* @version 1.0* @date 2025-07-08 17:39*/public interface JobService {/*** 创建新Job* @param request 创建参数* @return 新生成的Job ID*/String jobCreate(JobCreateRequest request);/*** 更新Job配置* @param request 更新参数(包含Job ID)* @return 是否更新成功*/boolean jobUpdate(JobUpdateRequest request);/*** 删除指定Job* @param id Job ID* @return 是否删除成功*/boolean jobDelete(String id);/*** 启动指定Job* @param id Job ID* @return 是否启动成功*/boolean jobStart(String id);/*** 停止指定Job* @param id Job ID* @return 是否停止成功*/boolean jobStop(String id);/*** 获取所有Job列表* @return Job列表*/List<Job> jobList();/*** 获取所有处于运行状态的Job* @return 运行中Job列表*/List<Job> getStartJobList();}
(2)服务实现类(JobServiceImpl)
package com.practical.service.impl;import cn.hutool.core.util.IdUtil;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;import com.baomidou.mybatisplus.core.toolkit.StringUtils;import com.baomidou.mybatisplus.core.toolkit.Wrappers;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.practical.common.ServiceExceptionUtils;import com.practical.dto.Job;import com.practical.dto.JobCreateRequest;import com.practical.dto.JobUpdateRequest;import com.practical.enums.JobStatusEnums;import com.practical.mapper.JobMapper;import com.practical.po.JobPO;import com.practical.service.JobService;import org.springframework.beans.BeanUtils;import org.springframework.scheduling.support.CronExpression;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.List;import java.util.stream.Collectors;/*** Job服务实现类* 实现Job的数据库操作及业务校验** @author chen* @version 1.0* @date 2025-07-08 17:39*/@Servicepublic class JobServiceImpl extends ServiceImpl<JobMapper, JobPO> implements JobService {@Resourceprivate JobMapper jobMapper;@Overridepublic String jobCreate(JobCreateRequest request) {// 参数合法性校验validCreateRequest(request);// 转换为数据库实体并保存JobPO jobPO = new JobPO();BeanUtils.copyProperties(request, jobPO);jobPO.setId(IdUtil.fastSimpleUUID()); // 生成唯一IDjobMapper.insert(jobPO);return jobPO.getId();}@Overridepublic boolean jobUpdate(JobUpdateRequest request) {if (StringUtils.isBlank(request.getId())) {throw ServiceExceptionUtils.exception("Job ID不能为空");}validCreateRequest(request); // 复用创建时的参数校验逻辑JobPO jobPO = new JobPO();BeanUtils.copyProperties(request, jobPO);return updateById(jobPO);}@Overridepublic boolean jobDelete(String id) {return removeById(id);}@Overridepublic boolean jobStart(String id) {// 更新状态为"运行中"LambdaUpdateWrapper<JobPO> updateWrapper = Wrappers.lambdaUpdate(JobPO.class).eq(JobPO::getId, id).set(JobPO::getStatus, JobStatusEnums.START.getStatus());return update(updateWrapper);}@Overridepublic boolean jobStop(String id) {// 更新状态为"停止"LambdaUpdateWrapper<JobPO> updateWrapper = Wrappers.lambdaUpdate(JobPO.class).eq(JobPO::getId, id).set(JobPO::getStatus, JobStatusEnums.STOP.getStatus());return update(updateWrapper);}@Overridepublic List<Job> jobList() {// 转换数据库实体为DTO返回return list().stream().map(Job::of).collect(Collectors.toList());}@Overridepublic List<Job> getStartJobList() {// 查询所有状态为"运行中"的JobLambdaQueryWrapper<JobPO> queryWrapper = Wrappers.lambdaQuery(JobPO.class).eq(JobPO::getStatus, JobStatusEnums.START.getStatus());return list(queryWrapper).stream().map(Job::of).collect(Collectors.toList());}/*** 校验Job创建/更新参数*/private void validCreateRequest(JobCreateRequest request) {if (StringUtils.isBlank(request.getName())) {throw ServiceExceptionUtils.exception("Job名称不能为空");}if (StringUtils.isBlank(request.getBeanName())) {throw ServiceExceptionUtils.exception("目标Bean名称不能为空");}if (StringUtils.isBlank(request.getBeanMethod())) {throw ServiceExceptionUtils.exception("目标方法名称不能为空");}if (StringUtils.isBlank(request.getCron()) || !CronExpression.isValidExpression(request.getCron())) {throw ServiceExceptionUtils.exception("Cron表达式格式不正确");}if (!JobStatusEnums.isValid(request.getStatus())) {throw ServiceExceptionUtils.exception("状态值无效(0-停止,1-运行中)");}}}
3. 调度核心组件
(1)任务调度器配置(ThreadPoolTaskScheduler)
package com.practical.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;/*** 任务调度器配置类* 配置Spring的线程池任务调度器,用于执行Job** @author chen* @version 1.0* @date 2025-07-09 14:20*/@Configurationpublic class SpringJobConfiguration {/*** 配置线程池任务调度器*/@Beanpublic ThreadPoolTaskScheduler threadPoolTaskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(100); // 线程池大小scheduler.setThreadNamePrefix("job-executor-"); // 线程名称前缀,便于日志追踪scheduler.setAwaitTerminationSeconds(60); // 关闭时等待任务完成的超时时间scheduler.setWaitForTasksToCompleteOnShutdown(true); // 关闭时是否等待任务完成return scheduler;}}
(2)Job 执行管理器(SpringJobRunManager)
package com.practical.service.impl;import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.practical.common.ServiceExceptionUtils;
import com.practical.dto.Job;
import com.practical.dto.JobCreateRequest;
import com.practical.dto.JobUpdateRequest;
import com.practical.enums.JobStatusEnums;
import com.practical.mapper.JobMapper;
import com.practical.po.JobPO;
import com.practical.service.JobService;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;/*
@description:
@ClassName JobServiceImpl
@author chen
@create 2025-07-08 17:39
@Version 1.0
*/
@Service
public class JobServiceImpl extends ServiceImpl<JobMapper, JobPO> implements JobService
{@Resourceprivate JobMapper jobMapper;@Overridepublic String jobCreate(JobCreateRequest request){// 参数校验this.valid(request);// 入库JobPO jobPO = new JobPO();BeanUtils.copyProperties(request, jobPO);jobPO.setId(IdUtil.fastSimpleUUID());jobMapper.insert(jobPO);return jobPO.getId();}@Overridepublic boolean jobUpdate(JobUpdateRequest request){// 参数校验if (StringUtils.isBlank(request.getId())){throw ServiceExceptionUtils.exception("id不能为空");}this.valid(request);JobPO jobPO = new JobPO();BeanUtils.copyProperties(request, jobPO);return updateById(jobPO);}@Overridepublic boolean jobDelete(String id){return this.removeById(id);}@Overridepublic boolean jobStart(String id){LambdaUpdateWrapper<JobPO> updateWrapper = Wrappers.lambdaUpdate(JobPO.class).eq(JobPO::getId, id).set(JobPO::getStatus, JobStatusEnums.START.getStatus());return this.update(updateWrapper);}@Overridepublic boolean jobStop(String id){LambdaUpdateWrapper<JobPO> updateWrapper = Wrappers.lambdaUpdate(JobPO.class).eq(JobPO::getId, id).set(JobPO::getStatus, JobStatusEnums.STOP.getStatus());return update(updateWrapper);}@Overridepublic List<Job> jobList(){return this.list().stream().map(Job::of).collect(Collectors.toList());}@Overridepublic List<Job> getStartJobList(){LambdaQueryWrapper<JobPO> qw = Wrappers.lambdaQuery(JobPO.class).eq(JobPO::getStatus, JobStatusEnums.START.getStatus());return this.list(qw).stream().map(Job::of).collect(Collectors.toList());}private void valid(JobCreateRequest request){if (StringUtils.isBlank(request.getName())){throw ServiceExceptionUtils.exception("job名称必填");}if (StringUtils.isBlank(request.getBeanName())){throw ServiceExceptionUtils.exception("beanName名称不能为空");}if (StringUtils.isBlank(request.getBeanMethod())){throw ServiceExceptionUtils.exception("beanMethod名称不能为空");}if (StringUtils.isBlank(request.getCron()) || !CronExpression.isValidExpression(request.getCron())){throw ServiceExceptionUtils.exception("cron格式不正确");}if (!JobStatusEnums.isValid(request.getStatus())){throw ServiceExceptionUtils.exception("status值无效");}}
}
(3)用到的dto
package com.practical.job;
import cn.hutool.core.util.ReflectUtil;
import com.practical.dto.Job;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.util.concurrent.ScheduledFuture;
/*
@description:
@ClassName SpringJobTask
@author chen
@create 2025-07-08 17:46
@Version 1.0
*/@Data
@Slf4j
public class SpringJobTask implements Runnable
{private ScheduledFuture scheduledFuture;private Job job;private ApplicationContext applicationContext;public SpringJobTask(Job job, ApplicationContext applicationContext){this.job = job;this.applicationContext = applicationContext;}@Overridepublic void run(){// 从spring容器中拿到bean,然后通过反射调用其需要执行的方法Object bean = this.applicationContext.getBean(this.job.getBeanName());ReflectUtil.invoke(bean, this.job.getBeanMethod());}public ScheduledFuture getScheduledFuture() {return scheduledFuture;}public void setScheduledFuture(ScheduledFuture scheduledFuture) {this.scheduledFuture = scheduledFuture;}public Job getJob() {return job;}public ApplicationContext getApplicationContext() {return applicationContext;}
}package com.practical.dto;import com.practical.po.JobPO;
import lombok.Data;
import org.springframework.beans.BeanUtils;/*
@description:
@ClassName Job
@author chen
@create 2025-07-08 16:57
@Version 1.0
*/
@Data
public class Job
{//job的idprivate String id;//job的名称private String name;//job的执行周期,cron表达式private String cron;//job需要执行那个bean,对应spring中bean的名称private String beanName;//job执行的bean的方法private String beanMethod;public static Job of(JobPO jobPO){if (jobPO == null){return null;}Job job = new Job();BeanUtils.copyProperties(jobPO, job);return job;}
}package com.practical.dto;import lombok.Data;import java.util.List;/*
@description:
@ClassName JobChange
@author chen
@create 2025-07-09 14:28
@Version 1.0
*/
@Data
public class JobChange
{// 新增的jobprivate List<Job> addJobList;// 删除的jobprivate List<Job> deleteJobList;// 更新的jobprivate List<Job> updateJobList;
}package com.practical.dto;import lombok.Data;/*
@description:
@ClassName JobCreateRequest
@author chen
@create 2025-07-08 17:41
@Version 1.0
*/
@Data
public class JobCreateRequest
{//job的名称private String name;//job的执行周期,cron表达式private String cron;//job需要执行那个bean,对应spring中bean的名称private String beanName;//job执行的bean的方法private String beanMethod;//job的状态,0:停止,1:执行中private Integer status;
}package com.practical.dto;import lombok.Data;/*
@description:
@ClassName JobUpdateRequest
@author chen
@create 2025-07-08 17:42
@Version 1.0
*/
@Data
public class JobUpdateRequest extends JobCreateRequest
{private String id;
}