当前位置: 首页 > news >正文

批量异步任务处理

当我们在项目中遇到很多业务同时处理,如果是串行肯定是影响性能的,这时候就需要异步执行了,说道异步肯定就有很多方案了

方案一:

比如使用spring的异步注解,比如下面的代码,每个方法上面都是异步注解,当时这种方案很多不足点,首先就是他的显示创建线程无法实现线程复用,然后就是无法统一处理异常及任务是否执行完了

class void test(){test01();test02();test03();
}

方案二 使用线程池

CountDownLatch 使用

线程池可以实现线程复用肯定是异步执行的不二选择
使用CountDownLatch:您可以在每个任务完成时递减 CountDownLatch,然后主线程等待 CountDownLatch 的计数为零,以确定所有任务都已经完成。这需要一些额外的编程工作,但允许更灵活的控制

int numberOfTasks = 10;
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch countDownLatch = new CountDownLatch(numberOfTasks);for (int i = 0; i < numberOfTasks; i++) {executorService.submit(() -> {// 执行任务countDownLatch.countDown();});
}try {countDownLatch.await();System.out.println("所有任务已经执行完毕");
} catch (InterruptedException e) {System.err.println("等待被中断");
}
executorService.shutdown();

awaitTermination

使用awaitTermination方法:ExecutorService 接口提供了 awaitTermination 方法,该方法允许您等待一段时间来检查线程池中的任务是否已经执行完。例如:

ExecutorService executorService = Executors.newFixedThreadPool(5);
// 提交任务到线程池executorService.shutdown(); // 停止接受新任务
try {if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {System.out.println("所有任务已经执行完毕");} else {System.out.println("等待超时,仍有任务未执行完");}
} catch (InterruptedException e) {System.err.println("awaitTermination被中断");
}

invokeAll

ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<Void>> tasks = new ArrayList<>();
// 添加任务到tasksList<Future<Void>> futures = executorService.invokeAll(tasks);
for (Future<Void> future : futures) {if (!future.isDone()) {System.out.println("仍有任务未执行完");break;}
}
executorService.shutdown();

方案三

既然使用了线程池能否再优化下呢使用java8 毕竟流行的异步编程CompletableFuture
比如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExample {public static void main(String[] args) {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {此处可以将任务结果存放在一个集合或者一个对象中// 执行任务1System.out.println("Task 1 is running...");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {// 执行任务2此处可以将任务结果存放在一个集合或者一个对象中System.out.println("Task 2 is running...");});CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {// 执行任务3此处可以将任务结果存放在一个集合或者一个对象中System.out.println("Task 3 is running...");});CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);try {allOf.get(); // 等待所有任务完成System.out.println("All tasks are completed.");} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

但是这种写法也不够优雅,因为每个异步任务都需要把结果存放出来 ,可以优化下

    private List<Object> asyncHandleTask(Map<String, TaskHandlerService> handleServiceMap, Map<String, TaskResultParserDTO> taskResultParserMap) {//提交任务List<CompletableFuture<List<Object>>> completableFutureList = handleServiceMap.entrySet().stream().map(entry -> CompletableFuture.supplyAsync(() -> entry.getValue().handle(taskResultParserMap.get(entry.getKey())), threadPoolTaskExecutor)).collect(Collectors.toList());//等待所有任务完成CompletableFuture<Void> allTaskFeatureList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));//等待所有任务完成或者CompletableFuture<Void>[] futures = new CompletableFuture[]{future1, future2, future3};CompletableFuture<Void> allOf = CompletableFuture.allOf(futures);   //等待所有任务完成或者CompletableFuture<Void> allTaskFeatureList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size]));//获取所有任务的结果数据CompletableFuture<List<List<Object>>> listCompletableFuture = allTaskFeatureList.thenApply(v -> completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));List<List<Object>> resultList;try {resultList = listCompletableFuture.get();} catch (InterruptedException | ExecutionException e) {log.error("处理任务结果数据失败,{}", ExceptionUtils.getStackTrace(e));throw new CustomException(TaskProcessStatusEnum.WAIT_PARSER_RESULT_FAIL);}//整合数据return resultList.stream().flatMap(Collection::stream).collect(Collectors.toList());}
http://www.lryc.cn/news/221198.html

相关文章:

  • 宜昌市公安局、点军区政府与中科升哲达成战略合作,共建视频图像联合创新实验室
  • java版小程序商城免费搭建-直播商城平台规划及常见的营销模式有哪些?电商源码/小程序/三级分销
  • Linux下yum源配置实战
  • JSONP 跨域访问(2), JSONP劫持
  • 【java】实现自定义注解校验——方法一
  • JavaScript基础入门03
  • P1903 [国家集训队] 数颜色 / 维护队列
  • uniapp 请求接口的方式
  • 怎么查看当前vue项目,要求的node.js版本
  • QT5自适应
  • 蓝桥杯官网练习题(日期问题)
  • PDF文件解析
  • 初识微服务技术栈
  • windows 下运行正常,但是linux下报错 : Could not find or load main class
  • MySQL 数据目录和 InnoDB 表空间补充知识:详细结构
  • 移远EC600U-CN开发板 day02
  • visual studio Python 配置QGIS(qgis)教程
  • 第二证券:消费电子概念活跃,博硕科技“20cm”涨停,天龙股份斩获10连板
  • petalinux 2022.2 在 ubantu18.04 下的安装
  • 【进程与线程】进程与线程 QA
  • 电脑风扇控制软件 Macs Fan Control Pro mac中文版功能介绍
  • 【13】c++11新特性 —>call_once
  • 解决logstash插件logstash-outputs-mongodb一条数据失败后一直重复尝试
  • 【网络协议】聊聊HTTPDNS如何工作的
  • TikTok与老年用户:社交媒体的跨代交流
  • 如何在Linux机器上使用ssh远程连接Windows Server服务器
  • NLP常见任务的分类指标
  • node插件express(路由)的插件使用(二)——body-parser和ejs插件的基本使用
  • 学习c++的第十天
  • 895. 最长上升子序列