当前位置: 首页 > news >正文

记一次数据修复,需要生成十万条sql进行数据回滚

一、背景

数据回滚

二、难点

2.1 需要处理的数据涉及多达数万个用户,每个用户涉及的表达到10个
2.2 时间紧急,需要快速回滚,数据需要完整
2.3 数据存在重复或空缺问题

三、解决方案

3.1 数据多,使用分批处理,把大任务分割成若干个小任务
3.2 时间紧,使用多线程CompletableFuture处理,提高处理效率
3.3 mysql数据有些是重复,需要去重,使用not exist处理,保障数据完整

四、案例代码
@Slf4j
public class DataRollBackProcessTest {// 自定义线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 600,TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));@Testpublic void startTest() throws ExecutionException, InterruptedException {List<Integer> list = new ArrayList<>();for (int i = 1; i <= 100; i++) {list.add(i);}concurrentProcess(list);}/*** * 并行处理,全部异步任务执行完才一起返回** @param list* @throws ExecutionException* @throws InterruptedException*/public void concurrentProcess(List<Integer> list) throws ExecutionException, InterruptedException {// 定义一个集合切割为小任务时每个任务的大小,int taskSize = 5;List<List<Integer>> divideList = divide(list, taskSize);// 创建一个CompletableFuture数组,用于存储异步操作的结果CompletableFuture<Void>[] futures = new CompletableFuture[divideList.size()];// 循环10次,每次执行一次异步操作for (int i = 0; i < divideList.size(); i++) {int index = i;CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步操作,可以在这里执行你的任务try {simulateLongDurationTasks(divideList.get(index));} catch (InterruptedException e) {e.printStackTrace();}System.out.println("异步操作 " + index + " 执行完成");}, threadPoolExecutor);// 将CompletableFuture对象存储到数组中futures[i] = future;}// 使用CompletableFuture.allOf等待所有异步操作完成CompletableFuture<Void> allOf = CompletableFuture.allOf(futures);// 阻塞,等待所有异步操作完成allOf.get();System.out.println("所有异步操作执行完成");}/*** 集合切分** @param origin* @param size* @param <T>* @return*/public <T> List<List<T>> divide(List<T> origin, int size) {if (origin == null || origin.size() == 0) {return Collections.emptyList();}int block = (origin.size() + size - 1) / size;return IntStream.range(0, block).boxed().map(i -> {int start = i * size;int end = Math.min(start + size, origin.size());return origin.subList(start, end);}).collect(Collectors.toList());}/*** 模拟耗时的任务* <p>* 需求背景:* 需要把一组用户的数据复制到另一组用户,生成sql脚本如下,为了简略,* 使用Thread.sleep替换耗时任务* <p>* -- 把B用户的数据插入到A用户,且A用户不存在相同的数据* sql使用点1: INSERT INTO student  from* sql使用点2: NOT EXISTS** INSERT INTO student (uid, STATUS, age, sex) SELECT* 61442, -- A用户* STATUS,* age,* sex* FROM* student t1* WHERE* t1.uid = 682801 -- B用户* AND t1. STATUS = 1* AND NOT EXISTS (* SELECT* t2.id* FROM* student t2* WHERE* t2.uid = 61442* AND t2.age = t1.age* AND t2.sex = t1.sex* );*/public void simulateLongDurationTasks(List<Integer> subList) throws InterruptedException {if (subList == null || subList.size() == 0) {return;}int sleepSeconds = subList.stream().mapToInt(e -> e).reduce(0, Integer::sum);log.info("thread id:{}, thread name:{}, thread states:{}, Thread.activeCount:{}, thread sleep:{}",Thread.currentThread().getId(),Thread.currentThread().getName(),Thread.currentThread().getState(),Thread.activeCount(),sleepSeconds);Thread.sleep(sleepSeconds);}
}
五、总结

使用分批处理,结合多线程,提高处理效率
多线程处理需要考虑系统资源竞争问题、顺序问题

http://www.lryc.cn/news/280094.html

相关文章:

  • [paddle]paddlehub部署paddleocr的hubserving服务
  • 2024校招,网易互娱游戏测试工程师一面
  • Linux Ubuntu搭建我的世界Minecraft服务器实现好友远程联机MC游戏
  • Springboot对接ceph集群以及java利用s3对象网关接口与ceph集群交互
  • nrm使用
  • 06-微服务OpenFeigh和Sentinel持久化
  • docker 安装redis (亲测有效)
  • 利用GitHub开源项目ChatGPTNextWeb构建属于自己的ChatGPT - Docker
  • Vue3使用ElementPlus中的el-upload手动上传并调用上传接口
  • 【Github3k+⭐️】《CogAgent: A Visual Language Model for GUI Agents》译读笔记
  • FF的异步清零端口需要时钟吗?--不需要
  • 【conda】pip安装报错,网络延时问题解决记录(亲测有效)
  • Spring Boot整理-Spring Boot的优势
  • C++标准学习--decltype
  • Linux之静态库和动态库
  • erlang/OTP 平台(学习笔记)(三)
  • Spring整理-Spring框架中用了哪些设计模式
  • Poi实现根据word模板导出-图表篇
  • windows或mac端口转发
  • Linux工具-搭建文件服务器
  • 深入理解@DubboReference与@DubboService【三】
  • linux主机的免密登录
  • Git常用命令和QA(网摘)
  • PHP AES 加密示例
  • 第十九章:特殊工具与技术
  • 大数据深度学习卷积神经网络CNN:CNN结构、训练与优化一文全解
  • RabbitMQ(九)死信队列
  • KEI5许可证没到期,编译却出现Error: C9555E: Failed to check out a license.问题解决
  • 南京观海微电子----时序图绘制工具
  • Gin CORS 跨域请求资源共享与中间件