当前位置: 首页 > news >正文

Spring Batch 高级篇-并行步骤

目录

引言

概念

案例

转视频版


引言

接着上篇:Spring Batch 高级篇-多线程步骤,了解Spring Batch多线程步骤后,接下来一起学习一下Spring Batch 高级功能-并行步骤

概念

并行步骤,指的是某2个或者多个步骤同时执行。比如下图

图中,流程从步骤1执行,然后执行步骤2, 步骤3,当步骤2/3执行结束之后,在执行步骤4.

设想一种场景,当读取2个或者多个互不关联的文件时,可以多个文件同时读取,这个就是并行步骤。

案例

需求:现有user-parallel.txt, user-parallel.json 2个文件将它们中数据读入内存  

1>编写user-parallel.txt, user-parallel.json

6#zhangsan#14
7#lisi#13
8#wangwu#12
9#zhaoliu#11
10#qianqi#10
[{"id":1, "name":"dafei", "age":18},{"id":2, "name":"xiaofei", "age":17},{"id":3, "name":"zhongfei", "age":16},{"id":4, "name":"laofei", "age":15},{"id":5, "name":"feifei", "age":14}
]

2>编写实体对象

@Getter
@Setter
@ToString
public class User {private Long id;private String name;private int age;
}

3>代码实现

package com.langfeiyes.batch._36_step_parallel;import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;import java.util.List;@SpringBootApplication
@EnableBatchProcessing
public class ParallelStepJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic JsonItemReader<User> jsonItemReader(){ObjectMapper objectMapper = new ObjectMapper();JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class);jsonObjectReader.setMapper(objectMapper);return new JsonItemReaderBuilder<User>().name("userJsonItemReader").jsonObjectReader(jsonObjectReader).resource(new ClassPathResource("user-parallel.json")).build();}@Beanpublic FlatFileItemReader<User> flatItemReader(){return new FlatFileItemReaderBuilder<User>().name("userItemReader").resource(new ClassPathResource("user-parallel.txt")).delimited().delimiter("#").names("id", "name", "age").targetType(User.class).build();}@Beanpublic ItemWriter<User> itemWriter(){return new ItemWriter<User>() {@Overridepublic void write(List<? extends User> items) throws Exception {items.forEach(System.err::println);}};}@Beanpublic Step jsonStep(){return stepBuilderFactory.get("jsonStep").<User, User>chunk(2).reader(jsonItemReader()).writer(itemWriter()).build();}@Beanpublic Step flatStep(){return stepBuilderFactory.get("step2").<User, User>chunk(2).reader(flatItemReader()).writer(itemWriter()).build();}@Beanpublic Job parallelJob(){//线程1-读user-parallel.txtFlow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1").start(flatStep()).build();//线程2-读user-parallel.jsonFlow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2").start(jsonStep()).split(new SimpleAsyncTaskExecutor()).add(parallelFlow1).build();return jobBuilderFactory.get("parallel-step-job").start(parallelFlow2).end().build();}public static void main(String[] args) {SpringApplication.run(ParallelStepJob.class, args);}
}

结果

User(id=6, name=zhangsan, age=14)
User(id=7, name=lisi, age=13)
User(id=8, name=wangwu, age=12)
User(id=9, name=zhaoliu, age=11)
User(id=1, name=dafei, age=18)
User(id=2, name=xiaofei, age=17)
User(id=10, name=qianqi, age=10)
User(id=3, name=zhongfei, age=16)
User(id=4, name=laofei, age=15)
User(id=5, name=feifei, age=14)

解析:

1:jsonItemReader() flatItemReader() 定义2个读入操作,分别读json格式跟普通文本格式

2:parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用.split(new SimpleAsyncTaskExecutor()) 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

http://www.lryc.cn/news/18474.html

相关文章:

  • 对spring的@Cacheable缓存理解
  • 力扣-市场分析
  • 【2357. 使数组中所有元素都等于零】
  • 什么品牌的游戏蓝牙耳机比较好?玩游戏延迟低的蓝牙耳机推荐
  • day 33 状态压缩dp
  • 扬帆优配|超3600股飘绿,人民币贬值近300点!外资净卖近38亿
  • 【编程基础之Python】6、Python基础知识
  • selenium基本操作
  • 思科设备命令讲解(超基础二)
  • HTML基础(3)
  • 鸿蒙3.0 APP混合开发闪退问题笔记
  • 批量操作文件功能-课后程序(JAVA基础案例教程-黑马程序员编著-第七章-课后作业)
  • Hadoop3.3.1完全分布式部署
  • SpringMVC中的注解
  • python+Vue学生作业系统 django课程在线学习网站系统
  • CSS 美化网页元素【快速掌握知识点】
  • Tableau连接openGauss实践
  • RabbitMQ 实现延迟队列
  • Spring Bean 生命周期,好像人的一生
  • C++算法基础课 05 —— 数据结构1_单链表/双链表/栈/单调栈/队列/单调队列/KMP
  • 小型水库大坝安全监测的主要对象
  • 常见软件开源(alpha,beta等)版本介绍
  • 凌恩生物资讯|抗性宏基因组又一力作|抗性基因+可移动元件研究新成果!
  • 常见前端基础面试题(HTML,CSS,JS)(二)
  • 按关键词搜索,商品详情采集,API接口
  • C++的纯虚函数使用与接口实现
  • Exception has occurred: ModuleNotFoundErrorNo module named ‘urllib3‘【已解决】
  • CSS 盒子模型【快速掌握知识点】
  • 公网远程连接Oracle数据库【内网穿透】
  • 国内售价仅10元的鸭子滑梯玩具TK卖到20美元,相关视频获400万+播放!