当前位置: 首页 > news >正文

Spring Batch 高级篇-分区步骤

目录

引言

概念

分区器

分区处理器

案例

转视频版


引言

接着上篇:Spring Batch 高级篇-并行步骤了解Spring Batch并行步骤后,接下来一起学习一下Spring Batch 高级功能-分区步骤

概念

分区:有划分,区分意思,在SpringBatch 分区步骤讲的是给执行步骤区分上下级。

上级: 主步骤(Master Step)

下级: 从步骤--工作步骤(Work Step)

主步骤是领导,不用干活,负责管理从步骤,从步骤是下属,必须干活。

一个主步骤下辖管理多个从步骤。

注意: 从步骤,不管多小,它也一个完整的Spring Batch 步骤,负责各自的读入、处理、写入等。

分区步骤结构图

 分区步骤一般用于海量数据的处理上,其采用是分治思想。主步骤将大的数据划分多个小的数据集,然后开启多个从步骤,要求每个从步骤负责一个数据集。当所有从步骤处理结束,整作业流程才算结束。

分区器

主步骤核心组件,负责数据分区,将完整的数据拆解成多个数据集,然后指派给从步骤,让其执行。

拆分规则由Partitioner分区器接口定制,默认的实现类:MultiResourcePartitioner

public interface Partitioner {Map<String, ExecutionContext> partition(int gridSize);
}

Partitioner 接口只有唯一的方法:partition 参数gridSize表示要分区的大小,可以理解为要开启多个worker步骤,返回值是一个Map, 其中key:worker步骤名称, value:worker步骤启动需要参数值,一般包含分区元数据,比如起始位置,数据量等。

分区处理器

主步骤核心组件,统一管理work 步骤, 并给work步骤指派任务。

管理规则由PartitionHandler 接口定义,默认的实现类:TaskExecutorPartitionHandler

案例

需求:下面几个文件将数据读入内存

 

步骤1:准备数据

user1-10.txt

1#dafei#18
2#dafei#18
3#dafei#18
4#dafei#18
5#dafei#18
6#dafei#18
7#dafei#18
8#dafei#18
9#dafei#18
10#dafei#18

user11-20.txt

11#dafei#18
12#dafei#18
13#dafei#18
14#dafei#18
15#dafei#18
16#dafei#18
17#dafei#18
18#dafei#18
19#dafei#18
20#dafei#18

user21-30.txt

21#dafei#18
22#dafei#18
23#dafei#18
24#dafei#18
25#dafei#18
26#dafei#18
27#dafei#18
28#dafei#18
29#dafei#18
30#dafei#18

user31-40.txt

31#dafei#18
32#dafei#18
33#dafei#18
34#dafei#18
35#dafei#18
36#dafei#18
37#dafei#18
38#dafei#18
39#dafei#18
40#dafei#18

user41-50.txt

41#dafei#18
42#dafei#18
43#dafei#18
44#dafei#18
45#dafei#18
46#dafei#18
47#dafei#18
48#dafei#18
49#dafei#18
50#dafei#18

步骤2:准备实体类

@Getter
@Setter
@ToString
public class User {private Long id;private String name;private int age;
}

步骤3:配置分区逻辑

public class UserPartitioner  implements Partitioner {@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> result = new HashMap<>(gridSize);int range = 10; //文件间隔int start = 1; //开始位置int end = 10;  //结束位置String text = "user%s-%s.txt";for (int i = 0; i < gridSize; i++) {ExecutionContext value = new ExecutionContext();Resource resource = new ClassPathResource(String.format(text, start, end));try {value.putString("file", resource.getURL().toExternalForm());} catch (IOException e) {e.printStackTrace();}start += range;end += range;result.put("user_partition_" + i, value);}return result;}
}

步骤4:全部代码

package com.langfeiyes.batch._37_step_part;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;import java.util.List;@SpringBootApplication
@EnableBatchProcessing
public class PartStepJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;//每个分区文件读取@Bean@StepScopepublic FlatFileItemReader<User> flatItemReader(@Value("#{stepExecutionContext['file']}") Resource resource){return new FlatFileItemReaderBuilder<User>().name("userItemReader").resource(resource).delimited().delimiter("#").names("id", "name", "age").targetType(User.class).build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}//文件分区器-设置分区规则@Beanpublic UserPartitioner userPartitioner(){return new UserPartitioner();}//文件分区处理器-处理分区@Beanpublic PartitionHandler userPartitionHandler() {TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(5);handler.setTaskExecutor(new SimpleAsyncTaskExecutor());handler.setStep(workStep());try {handler.afterPropertiesSet();} catch (Exception e) {e.printStackTrace();}return handler;}//每个从分区操作步骤@Beanpublic Step workStep() {return stepBuilderFactory.get("workStep").<User, User>chunk(10).reader(flatItemReader(null)).writer(itemWriter()).build();}//主分区操作步骤@Beanpublic Step masterStep() {return stepBuilderFactory.get("masterStep").partitioner(workStep().getName(),userPartitioner()).partitionHandler(userPartitionHandler()).build();}@Beanpublic Job partJob(){return jobBuilderFactory.get("part-step-job").start(masterStep()).build();}public static void main(String[] args) {SpringApplication.run(PartStepJob.class, args);}
}

结果:

User(id=31, name=dafei, age=18)
User(id=32, name=dafei, age=18)
User(id=33, name=dafei, age=18)
User(id=34, name=dafei, age=18)
User(id=35, name=dafei, age=18)
User(id=36, name=dafei, age=18)
User(id=37, name=dafei, age=18)
User(id=38, name=dafei, age=18)
User(id=39, name=dafei, age=18)
User(id=40, name=dafei, age=18)
User(id=41, name=dafei, age=18)
User(id=42, name=dafei, age=18)
User(id=43, name=dafei, age=18)
User(id=44, name=dafei, age=18)
User(id=45, name=dafei, age=18)
User(id=46, name=dafei, age=18)
User(id=47, name=dafei, age=18)
User(id=48, name=dafei, age=18)
User(id=49, name=dafei, age=18)
User(id=50, name=dafei, age=18)
User(id=21, name=dafei, age=18)
User(id=22, name=dafei, age=18)
User(id=23, name=dafei, age=18)
User(id=24, name=dafei, age=18)
User(id=25, name=dafei, age=18)
User(id=26, name=dafei, age=18)
User(id=27, name=dafei, age=18)
User(id=28, name=dafei, age=18)
User(id=29, name=dafei, age=18)
User(id=30, name=dafei, age=18)
User(id=1, name=dafei, age=18)
User(id=2, name=dafei, age=18)
User(id=3, name=dafei, age=18)
User(id=4, name=dafei, age=18)
User(id=5, name=dafei, age=18)
User(id=6, name=dafei, age=18)
User(id=7, name=dafei, age=18)
User(id=8, name=dafei, age=18)
User(id=9, name=dafei, age=18)
User(id=10, name=dafei, age=18)
User(id=11, name=dafei, age=18)
User(id=12, name=dafei, age=18)
User(id=13, name=dafei, age=18)
User(id=14, name=dafei, age=18)
User(id=15, name=dafei, age=18)
User(id=16, name=dafei, age=18)
User(id=17, name=dafei, age=18)
User(id=18, name=dafei, age=18)
User(id=19, name=dafei, age=18)
User(id=20, name=dafei, age=18)

解析:核心点

1>文件分区器:userPartitioner(), 分别加载5个文件进入到程序

2>文件分区处理器:userPartitionHandler() ,指定要分几个区,由谁来处理

3>分区从步骤:workStep() 指定读逻辑与写逻辑

4>分区文件读取:flatItemReader(),需要传入Resource对象,这个对象在userPartitioner()已经标记为file

5>分区主步骤:masterStep() ,指定分区名称与分区器,指定分区处理器

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

http://www.lryc.cn/news/20461.html

相关文章:

  • ES数据迁移_snapshot(不需要安装其他软件)
  • 【Vue3 第二十章】异步组件 代码分包 Suspense内置组件 顶层 await
  • 「媒体邀约」四川有哪些媒体,成都活动媒体邀约
  • @Autowired和@Resource的区别
  • Linux系列:glibc程序设计规范与内存管理思想
  • Redis 集群
  • EF 框架的简介、发展历史;ORM框架概念
  • 注解原理剖析与实战
  • 《STL源码剖析》理解之将类成员函数和for_each等算法结合
  • 如何构建应用标准化体系
  • 【RabbitMQ笔记03】消息队列RabbitMQ七种模式之WorkQueues工作队列模式
  • 认识html
  • 在外包公司熬了 3 年终于进了字节,竭尽全力....
  • 绝对让你明明白白,脚把脚带你盯着 I2C 时序图将 I2C 程序给扣出来(基于STM32的模拟I2C)
  • 2023年全国最新工会考试精选真题及答案5
  • 一文2000字手把手教你自动化测试Selenium+pytest+数据驱动
  • windows安装Ubuntu子系统以及图形化界面记录
  • 通俗易懂,十分钟读懂DES,详解DES加密算法原理,DES攻击手段以及3DES原理。Python DES实现源码
  • 为多态基类声明virtual析构函数
  • 啊哈 算法读书笔记 第 2 章 栈、队列、链表
  • Git ---- IDEA 集成 Git
  • 【LeetCode 704】【Go】二分查找
  • 【代码随想录训练营】【Day23】第六章|二叉树|669. 修剪二叉搜索树 |108.将有序数组转换为二叉搜索树|538.把二叉搜索树转换为累加树
  • CV——day78 读论文:通过静态背景构建扩展低通道路边雷达的探测距离(目标是规避风险)
  • 【编程入门】应用市场(go语言版)
  • Linux(openEuler)没有界面连接互联网方法
  • 第一天 软考中级--嵌入式系统设计师考试复习教程开始了
  • 分享 10 个高频 Python 面试题
  • ThreadLocal原理、结构、源码解析
  • 分布式之PBFT算法