当前位置: 首页 > news >正文

多个线程处理不同的数据,等线程都完成后再进行下一步操作

现在有三个任务,三个任务之间没有关联关系,但是第四个任务要等前三个完成之后才能进行,于是使用多线程完成前三个任务节省时间

示例代码:

public void saveDataByOnlineTimeNew(LocalDateTime startTime, LocalDateTime endTime) {Objects.requireNonNull(startTime, "开始时间不能为空");Objects.requireNonNull(endTime, "结束时间不能为空");List<User> users = baseMapper.selectAllRightUser(startTime, endTime);if (users.isEmpty()) {return;}List<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());// 创建三个 CompletableFuture 分别处理三个列表CompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {List<ProUserStatisticsNew> proUserStatisticsNews = saveDataByOnlineTime(startTime, endTime, userIdList);users.parallelStream().forEach(user -> {log.trace("当前线程名字:"+Thread.currentThread().getName());List<ProUserStatisticsNew> versionNotInRelationList = this.saveDataByOnlineTimeByUser(startTime, endTime, user);if (!versionNotInRelationList.isEmpty()) {proUserStatisticsNews.addAll(versionNotInRelationList);}});return proUserStatisticsNews;});CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));// 等待所有 CompletableFuture 完成CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有 CompletableFuture 完成后,进行合并操作CompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->mergeLists(future1.join(), future2.join(), future3.join()));List<ProUserStatisticsNew> result = join.join();//List<ProUserStatisticsNew> result = mergeLists(proUserStatisticsNews, noVersionDataList, learnDataList);SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);result.parallelStream().forEach(one -> {one.setRecordDate(sysConfigTimeByTime.getYearmonth());one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));});// 批量保存或更新int batchSize = 100; // 根据具体要求和系统能力设置适当的批处理大小for (int i = 0; i < result.size(); i += batchSize) {int end = Math.min(result.size(), i + batchSize);this.saveOrUpdateBatch(result.subList(i, end));}}

代码解析:

这段代码实现了一个方法 `saveDataByOnlineTimeNew`,它通过多个异步任务(使用 `CompletableFuture`)并行处理数据,并最终将结果批量保存或更新到数据库中。具体的逻辑流程可以分为几个部分,下面我会逐步解释每个部分。### 方法说明- **输入参数**: `startTime` 和 `endTime` 是查询的开始和结束时间(`LocalDateTime` 类型),指定了需要查询的数据范围。- **核心目标**: 这个方法的目标是根据 `startTime` 和 `endTime` 获取相关数据,并将结果合并后保存或更新到数据库。### 代码分析1. **非空检查**:```javaObjects.requireNonNull(startTime, "开始时间不能为空");Objects.requireNonNull(endTime, "结束时间不能为空");```这两行代码确保 `startTime` 和 `endTime` 不能为空。如果为 `null`,将抛出 `NullPointerException`,并带有指定的错误信息。2. **获取用户列表**:```javaList<User> users = baseMapper.selectAllRightUser(startTime, endTime);if (users.isEmpty()) {return;}```这里通过调用 `baseMapper.selectAllRightUser(startTime, endTime)` 查询所有符合条件的用户。如果查询结果为空,则直接返回,不进行后续处理。3. **获取用户 ID 列表**:```javaList<Integer> userIdList = users.stream().map(User::getId).collect(Collectors.toList());```使用 `stream` 获取所有用户的 ID,方便后续操作。4. **创建多个 `CompletableFuture`**:```javaCompletableFuture<List<ProUserStatisticsNew>> future1 = CompletableFuture.supplyAsync(() -> {// 异步任务1});CompletableFuture<List<ProUserStatisticsNew>> future2 = CompletableFuture.supplyAsync(() -> saveNoVersionDataByOnlineTime(startTime, endTime, userIdList));CompletableFuture<List<ProUserStatisticsNew>> future3 = CompletableFuture.supplyAsync(() -> saveLearnDataByOnlineTime(startTime, endTime, userIdList));```创建了三个异步任务,每个任务执行不同的操作:- **`future1`**:调用 `saveDataByOnlineTime` 来保存基于在线时间的数据,然后对每个用户并行处理数据,并将结果合并。- **`future2`**:调用 `saveNoVersionDataByOnlineTime` 来保存没有版本数据的数据。- **`future3`**:调用 `saveLearnDataByOnlineTime` 来保存学习数据。5. **等待所有异步任务完成**:```javaCompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);```使用 `CompletableFuture.allOf()` 来等待所有的 `future1`, `future2`, `future3` 完成。`allOf` 返回一个 `CompletableFuture<Void>`,表明所有任务已完成。6. **合并结果**:```javaCompletableFuture<List<ProUserStatisticsNew>> join = allFutures.thenApplyAsync(v ->mergeLists(future1.join(), future2.join(), future3.join()));```当所有的异步任务完成后,调用 `thenApplyAsync` 继续处理合并操作。`join` 方法会阻塞直到每个 `CompletableFuture` 返回结果,合并三个任务的结果列表。7. **结果后处理**:```javaList<ProUserStatisticsNew> result = join.join();SysConfigTime sysConfigTimeByTime = SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime);result.parallelStream().forEach(one -> {one.setRecordDate(sysConfigTimeByTime.getYearmonth());one.setProjectHoursDifference(one.getWorkHours().subtract(one.getActualHours()));});```- `join.join()` 等待合并操作完成,获得最终的 `result` 列表。- 使用 `SysConfigTimeUtil.getSysConfigTimeByTimeForQuery(endTime)` 获取系统配置的时间信息。- 对 `result` 列表中的每一项,设置其记录日期(`setRecordDate`)和工作时间差(`setProjectHoursDifference`)。8. **批量保存或更新**:```javaint batchSize = 100;for (int i = 0; i < result.size(); i += batchSize) {int end = Math.min(result.size(), i + batchSize);this.saveOrUpdateBatch(result.subList(i, end));}```为了避免一次性将大量数据写入数据库,采用批处理的方式分批保存或更新数据。每次处理 `batchSize` 条记录(这里设定为100),直到所有记录处理完。### 关键点总结- **异步任务并行执行**:使用 `CompletableFuture.supplyAsync()` 创建三个并行任务来处理数据,从而加快整个处理过程。
- **合并操作**:通过 `thenApplyAsync` 合并三个异步任务的结果。
- **批量操作**:为了提高性能,使用批量保存或更新的方法 `saveOrUpdateBatch`,避免一次性提交过多数据。### 性能优化
- 该方法使用了并行流 (`parallelStream()`) 和异步执行(`CompletableFuture`),这些可以显著提升性能,尤其是对于大规模数据的处理。
- 批量操作减少了数据库访问的次数,提高了数据库的写入效率。总的来说,这段代码实现了一个高效的数据处理和保存机制,使用了现代 Java 特性(如异步编程和流操作)来优化性能。

http://www.lryc.cn/news/514899.html

相关文章:

  • 聆听音乐 1.5.9 | 畅听全网音乐,支持无损音质下载
  • Rust 基础入门指南
  • 青少年编程与数学 02-006 前端开发框架VUE 03课题、编写APP组件
  • 基于Java的银行排号系统的设计与实现【源码+文档+部署讲解】
  • linux-26 文件管理(四)install
  • VS2015中使用boost库函数时报错问题解决error C4996 ‘std::_Copy_impl‘
  • pikachu靶场--目录遍历和敏感信息泄露
  • 植物大战僵尸杂交版3.0.2版本
  • kafka怎么保证顺序消费?
  • Makefile 模板 --- 内核模块
  • 仓库叉车高科技安全辅助设备——AI防碰撞系统N2024G-2
  • 计算机视觉CV期末总复习
  • 【微信小程序获取用户手机号
  • WFP Listbox绑定数据后,数据变化的刷新
  • Android Camera压力测试工具
  • 【华为OD-E卷 - 最优资源分配 100分(python、java、c++、js、c)】
  • 字符串格式时间(HH-MM)添加间隔时间后转为HH-MM输出
  • SQL 基础教程 - SQL ORDER BY 关键字
  • STM32 软件I2C读写
  • neo4j学习笔记
  • 【动手学电机驱动】STM32-MBD(2)将 Simulink 模型部署到 STM32G431 开发板
  • Nginx代理本地exe服务http为https
  • C++: glibc: pthread: pthread_cond_destroy,程序hang一例
  • 【中间件】docker+kafka单节点部署---zookeeper模式
  • 深入Android架构(从线程到AIDL)_08 认识Android的主线程
  • 集线器,交换机,路由器,mac地址和ip地址知识记录总结
  • 【VUE】使用create-vue快速创建一个vue + vite +vue-route 等其他查看的工程
  • Jetpack Compose 学习笔记(一)—— 快速上手
  • Kafka3.x KRaft 模式 (没有zookeeper) 常用命令
  • Leetcode 最大正方形