当前位置: 首页 > news >正文

Springboot 集成 SpringBatch 批处理组件

Springboot 集成 SpringBatch 批处理组件

  • 1.Spring Batch 简介
  • 2.批处理工具架构和示例
    • 2.1 批处理任务持久化控制
    • 2.2 实现一个读取器
    • 2.3 定义批处理JOB
    • 2.4数据实体
    • 2.5 接口类
    • 4.6 H2 配置
    • 4.7 H2 数据库脚本
  • 3.测试

1.Spring Batch 简介

Spring Batch 是 Spring 生态系统中的​​企业级批处理框架​​,专门设计用于处理大规模数据作业。它提供了批处理应用所需的核心功能,解决了传统批处理应用开发中的重复性问题,使开发人员能够专注于业务逻辑而非基础设施。

  • 核心价值与定位​​

    ​​问题解决​​:自动化处理​​周期性的、数据密集型的​​任务(如报表生成、数据迁移、对账结算)
    ​​典型场景​​:
    每月财务报表生成
    银行日终批量交易处理
    电商平台每日用户行为分析
    百万级数据迁移(如旧系统到新系统)

2.批处理工具架构和示例

接口
ItemReader
处理ItemProcessor
ItemWriter

项目结构

在这里插入图片描述

依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>SpringBatcher</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring-boot.version>3.5.3</spring-boot.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.3</version></dependency><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope> <!-- 通常只需运行时依赖 --></dependency></dependencies></project>

启动类

package org.example;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@EnableBatchProcessing
@SpringBootApplication
public class BatchApp {public static void main(String[] args) {SpringApplication.run(BatchApp.class, args);}
}

2.1 批处理任务持久化控制

示例代码基于 H2 存储

package org.example.config;import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;import javax.sql.DataSource;/*** @Author zhx && moon* @Since 21* @Date 2025-06-20 PM 4:21*/
@Configuration
public class BatchJobConfig {@Beanpublic JobRepository jobRepository(DataSource dataSource) throws Exception {JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();bean.setDataSource(dataSource);bean.setDatabaseType("H2");bean.setTransactionManager(new DataSourceTransactionManager(dataSource));bean.afterPropertiesSet();return bean.getObject();}}

2.2 实现一个读取器

以 Excel 文件读取为例

package org.example.job.common;import com.alibaba.excel.EasyExcel;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;import java.io.File;
import java.util.List;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:16*/
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean {private final Class<T> clazz;private final String filePath;private List<T> cacheList;private int index = 0;public EasyExcelItemReader(Class<T> clazz, String filePath) {this.clazz = clazz;this.filePath = filePath;}@Overridepublic void afterPropertiesSet() {try {// 一次性读取Excel所有数据(适用于中小文件)cacheList = EasyExcel.read(new File(filePath)).head(clazz).sheet().headRowNumber(1) // 跳过标题行.doReadSync();} catch (Exception e) {throw new RuntimeException("read excel failed ", e);}}@Overridepublic T read() {if (index < cacheList.size()) {return cacheList.get(index++);}// 重置读取的位置index = 0;return null;}
}

2.3 定义批处理JOB

package org.example.job;import org.example.entity.User;
import org.example.job.common.EasyExcelItemReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:08*/
@Component
public class SVCJob {/*** Excel 读取* @return*/@Bean("easyExcelItemReader")public EasyExcelItemReader<User> easyExcelItemReader() {return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx");}/*** 数据处理器 对读取的数据进行加工* @return*/@Bean("getNameProcessors")public ItemProcessor<User, String> getNameProcessors() {return item -> {return item.getName();};}/*** 配置写入器(保持不变)* @return*/@Bean("nameWriter")public ItemWriter<String> nameWriter() {return items -> {for (String item : items) {System.out.println("User Name: " + item);}};}/*** 配置批处理步骤(使用新版API)* @param jobRepository* @param transactionManager* @param reader* @param processor* @param writer* @return*/@Bean("easyExcelStep")public Step easyExcelStep(JobRepository jobRepository,PlatformTransactionManager transactionManager,@Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader,@Qualifier("getNameProcessors") ItemProcessor<User, String> processor,@Qualifier("nameWriter") ItemWriter<String> writer) {return new StepBuilder("easyExcelStep", jobRepository).<User, String>chunk(100, transactionManager).reader(reader).processor(processor).writer(writer).faultTolerant().skipLimit(1).skip(IllegalArgumentException.class).listener(new StepExecutionListener() {@Overridepublic void beforeStep(StepExecution stepExecution) {System.out.println("start to processor data ...");}}).build();}/*** 配置批处理作业* @param jobRepository* @param importStep* @return*/@Bean("easyExcelImportJobs")public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) {return new JobBuilder("easyExcelImportJobs", jobRepository).incrementer(new RunIdIncrementer()).start(importStep).listener(new JobExecutionListener() {@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Finished!State: " + jobExecution.getStatus());}}).build();}}

2.4数据实体

package org.example.entity;import com.alibaba.excel.annotation.ExcelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @Author zhx && moon* @Since 21* @Date 2025-06-24 PM 2:20*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {@ExcelProperty("姓名")private String name;@ExcelProperty("编号")private String employeeId;@ExcelProperty("年龄")private Integer age;}

2.5 接口类

package org.example.controller;import jakarta.annotation.Resource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author zhx && moon* @Since 21* @Date 2025-06-23 PM 4:40*/
@RestController
@RequestMapping("/job")
public class JobManage {@Autowiredprivate JobLauncher jobLauncher;@Resource(name = "easyExcelImportJobs")Job job;@GetMapping("/start")public void start(){try {JobParameters params = new JobParametersBuilder().addLong("uniqueId", System.nanoTime()).toJobParameters();jobLauncher.run(job, params);} catch (Exception e) {throw new RuntimeException(e);}}}

4.6 H2 配置

spring:datasource:url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb  #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdbdriver-class-name: org.h2.Driverusername: sapassword: sah2:console:enabled: truepath: /h2/db-consolesettings:web-allow-others: truebatch:jdbc:initialize-schema: always

4.7 H2 数据库脚本

-- Autogenerated: do not edit this fileCREATE TABLE BATCH_JOB_INSTANCE  (JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;CREATE TABLE BATCH_JOB_EXECUTION  (JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT  ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP(9) NOT NULL,START_TIME TIMESTAMP(9) DEFAULT NULL ,END_TIME TIMESTAMP(9) DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP(9),constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (JOB_EXECUTION_ID BIGINT NOT NULL ,PARAMETER_NAME VARCHAR(100) NOT NULL ,PARAMETER_TYPE VARCHAR(100) NOT NULL ,PARAMETER_VALUE VARCHAR(2500) ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE TABLE BATCH_STEP_EXECUTION  (STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,CREATE_TIME TIMESTAMP(9) NOT NULL,START_TIME TIMESTAMP(9) DEFAULT NULL ,END_TIME TIMESTAMP(9) DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED TIMESTAMP(9),constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT LONGVARCHAR ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT LONGVARCHAR ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

3.测试

启动服务

在这里插入图片描述

测试 H2 连接

在这里插入图片描述

测试数据

在这里插入图片描述

触发 JOB

在这里插入图片描述

JOB 执行记录

在这里插入图片描述

http://www.lryc.cn/news/577026.html

相关文章:

  • 2.安装Docker
  • 力扣第87题-扰乱字符串
  • 如何通过自动化减少重复性工作
  • Vue中的v-if与emit事件传递:一个常见陷阱分析
  • 推荐几本关于网络安全的书
  • FastAPI+Sqlite+HTML的登录注册与文件上传系统:完整实现指南
  • 6月28日记
  • Re:从0开始的 空闲磁盘块管理(考研向)
  • H3C-路由器交换机-中继
  • 用户行为序列建模(篇六)-【阿里】DSIN
  • DeepSeek五子棋游戏与AI对战
  • 【unity游戏开发——网络】网络游戏通信方案——强联网游戏(Socket长连接)、 弱联网游戏(HTTP短连接)
  • WebRTC(十三):信令服务器
  • Qt Windows下编译动态库生成的.a文件是什么?
  • 新生代潜力股刘小北:演艺路上的璀璨新星
  • Function Calling与MCP的区别
  • Ubuntu开放mysql 3306端口
  • X-Search:Spring AI实现的AI智能搜索
  • SpringMVC实战:从配置到JSON处理全解析
  • AlphaFold3安装报错
  • SpringCloud系列(40)--SpringCloud Gateway的Filter的简介及使用
  • cron 表达式 0 10 0/2 * * ? 的含义
  • Linux基本命令篇 —— head命令
  • 5 c++核心——文件操作
  • Origin绘制复合子母饼状图—复合柱饼图、复合环饼图及复合饼图
  • [Linux] PXE
  • es6特性-第一部分
  • Tomcat 安装使用教程
  • mybatis-plus从入门到入土(一):快速开始
  • 云端可视化耦合电磁场:麦克斯韦方程组的应用-AI云计算数值分析和代码验证