当前位置: 首页 > news >正文

CompletableFuture异步任务编排使用

CompletableFuture异步任务编排使用

  • runAsync 和 supplyAsync
    • allOf 和 anyOf
    • join 和 get
    • whenComplete 和 whenCompleteAsync 和 exceptionally
    • handle 和 handleAsync
  • 串行编排
    • runAsync().thenRunAsync()
    • supplyAsync().thenAcceptAsync((res) ->{})
    • supplyAsync().thenApplyAsync((res) ->{return}
  • 两个任务都完成,再做其他事
    • runAfterBothAsync
    • thenAcceptBothAsync
    • thenCombine
  • 任意一个任务完成,再做其他事
    • runAfterEitherAsync
    • acceptEitherAsync
    • applyToEitherAsync
  • 总结

runAsync 和 supplyAsync

  • runAsync(runnable):无返回值
  • runAsync(runnable, executor):无返回值,可自定义线程池
  • supplyAsync(runnable):有返回值
  • supplyAsync(runnable, executor):有回值,可自定义线程池

相关代码演示:

    public static void testOne(){CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");});CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start2");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");return "this is twoFuture";});CompletableFuture.allOf(oneFuture, twoFuture).join();System.out.println(oneFuture.join());System.out.println(twoFuture.join());}
start1
start2
end1
end2
null
this is twoFuture

解析:oneFuture.join()获取的执行结果为null,因为runAsync是没有返回结果的。

allOf 和 anyOf

  • allOf(future1,future2,future3…):等待所有future任务都完成,才可以做接下来的事。无返回值
  • anyOf(future1,future2,future3…):任意一个任务完成,就可以做接下来的事。返回object

allOf用法示例:

    public static void testTwo(){long startTime = System.currentTimeMillis();CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");});CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start2");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");return "this is twoFuture";});CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start3");try {Thread.sleep(1500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end3");return 100;});CompletableFuture.allOf(oneFuture, twoFuture, threeFuture).join();System.out.println(twoFuture.join() + threeFuture.join());System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
start2
start3
end1
end3
end2
this is twoFuture100
cost:2067ms

解析:allOf后的join起阻塞主线程作用。从结果可以看出,所有future执行完成后,再执行的主线程逻辑。

anyOf用法示例:

    public static void testThree(){long startTime = System.currentTimeMillis();CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");});CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start2");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");return "this is twoFuture";});CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start3");try {Thread.sleep(1500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end3");return 100;});Object result = CompletableFuture.anyOf(oneFuture, twoFuture, threeFuture).join();System.out.println("result:" + result);System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
start2
start3
end1
result:null
cost:1058ms

解析:oneFuture 最先完成,因为没有返回值,所以获得的结果是null

join 和 get

都是用于获取Completable的返回值的

  • join方法可能会抛出未检验的异常
  • get方法强制用户手动处理异常

whenComplete 和 whenCompleteAsync 和 exceptionally

  • whenComplete:执行当前线程的任务继续执行whenComplete的任务
  • whenCompleteAsync:whenCompleteAsync的任务是由线程池来执行
  • CompleableFuture即使发生异常也会执行whenComplete、whenCompleteAsync
  • exceptionally是用来处理异常的

以whenComplete举例
正常逻辑:

    public static void testFour() {long startTime = System.currentTimeMillis();CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");return 100;}).whenComplete((res, e) ->{System.out.println("res:" + res);System.out.println("e:" + e);}).exceptionally((e) ->{System.out.println("error:" + e);return -1;});System.out.println("result:" + oneFuture.join());System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
res:100
e:null
result:100
cost:1084ms

捕获和处理异常:

    public static void testFive() {long startTime = System.currentTimeMillis();CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");return 100/0;}).whenComplete((res, e) ->{System.out.println("res:" + res);System.out.println("e:" + e);}).exceptionally((e) ->{System.out.println("error:" + e);return -1;});System.out.println("result:" + oneFuture.join());System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
error:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1073ms

handle 和 handleAsync

  • handle和handleAsync的区别是后者用线程池管理
  • handle相当于whenComplete和exceptionally的组合,能够对异常捕获和处理

handle捕获和处理异常:

    public static void testSix() {long startTime = System.currentTimeMillis();CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end1");return 100/0;}).handle((res, e) ->{System.out.println("res:" + res);System.out.println("e:" + e);return -1;});System.out.println("result:" + oneFuture.join());System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1081ms

串行编排

  • 该模块用到的api都有普通版和async版本,这里不做赘述。async版本可以传入线程池,用线程池管理逻辑。

runAsync().thenRunAsync()

  • runAsync没有返回值,thenRunAsync也没有返回值
    public static void testSeven(){long startTime = System.currentTimeMillis();CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {System.out.println("start1");System.out.println("end1");}).thenRunAsync(() ->{System.out.println("do something");});oneFuture.join();System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
do something
cost:72ms

supplyAsync().thenAcceptAsync((res) ->{})

  • thenAcceptAsync取supplyAsync的返回值,自身没有返回值
    public static void testEight(){long startTime = System.currentTimeMillis();CompletableFuture<Void> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;}).thenAcceptAsync((res) ->{System.out.println("res:"+ res);});oneFuture.join();System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
res:100
cost:83ms

supplyAsync().thenApplyAsync((res) ->{return}

  • thenApplyAsync取supplyAsync的返回值,自身也有返回值
    public static void testNine(){long startTime = System.currentTimeMillis();CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;}).thenApplyAsync((res) ->{return 100* 10;});System.out.println("result:" + oneFuture.join());System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");}
start1
end1
result:1000
cost:75ms

两个任务都完成,再做其他事

runAfterBothAsync

  • 无入参、无出参
    public static void testTen(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {System.out.println("start2");System.out.println("end2");});oneFuture.runAfterBothAsync(twoFuture, ()->{System.out.println("do something");});System.out.println("result:" + oneFuture.join());}
start1
end1
start2
end2
do something
result:100

thenAcceptBothAsync

  • 有入参、无出参
    public static void testEleven(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {System.out.println("start2");System.out.println("end2");});oneFuture.thenAcceptBothAsync(twoFuture, (res1, res2)->{System.out.println("res1:" + res1);System.out.println("res2:" + res2);});System.out.println("result:" + oneFuture.join());}
start1
end1
start2
end2
result:100
res1:100
res2:null

thenCombine

  • 有入参、有出参
    public static void testTwelve(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {System.out.println("start2");System.out.println("end2");});CompletableFuture<Integer> combineFuture = oneFuture.thenCombine(twoFuture, (res1, res2) -> {System.out.println("res1:" + res1);System.out.println("res2:" + res2);return res1 == 100 ? res1 : -1;});System.out.println("result1:" + oneFuture.join());System.out.println("combine:" + combineFuture.join());}
start1
end1
start2
end2
res1:100
res2:null
result1:100
combine:100

任意一个任务完成,再做其他事

runAfterEitherAsync

  • 无入参、无出参
    public static void testThirteen(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {System.out.println("start2");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");});oneFuture.runAfterEitherAsync(twoFuture, ()->{System.out.println("do something");});System.out.println("result:" + oneFuture.join());}
start1
end1
start2
result:100
do something

acceptEitherAsync

  • 有入参、无出参
    public static void testFourteen(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start2");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");return 10;});oneFuture.acceptEitherAsync(twoFuture, (res)->{System.out.println("res:"+ res);});System.out.println("result:" + oneFuture.join());}
start1
end1
start2
result:100
res:100

applyToEitherAsync

  • 有入参、有出参
    public static void testFifteen(){CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start1");System.out.println("end1");return 100;});CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {System.out.println("start2");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end2");return 10;});CompletableFuture<Integer> applyFuture = oneFuture.applyToEitherAsync(twoFuture, (res) -> {System.out.println("res:" + res);return res * 10;});System.out.println("result:" + oneFuture.join());System.out.println("applyFuture:" + applyFuture.join());}
start1
end1
start2
result:100
res:100
applyFuture:1000

总结

根据以上api,在多任务的情况下可以实现任意组合,实现异步执行逻辑,并提高了代码的执行效率。

http://www.lryc.cn/news/68176.html

相关文章:

  • Scala的高级用法
  • 【31.在排序数组中查找元素的第一个和最后一个位置】
  • 如何构建“Buy Me a Coffee”DeFi dApp
  • Redis 实战篇:巧用 Bitmap 实现亿级海量数据统计
  • 3 天,入门 TAURI 并开发一个跨平台 ChatGPT 客户端
  • 14个最佳创业企业WordPress主题
  • MySQL基础(三十)PowerDesigner的使用
  • nginx 服务器总结
  • 基于Hebb学习的深度学习方法总结
  • 思科模拟器 | 访问控制列表ACL实现网段精准隔绝
  • Python os模块详解
  • Oracle PL/SQL基础语法学习13:比较运算符
  • 金仓数据库适配记录
  • ElasticSearch 学习 ==ELK== 进阶
  • 【数据结构 -- C语言】 双向带头循环链表的实现
  • 自然语言处理与其Mix-up数据增强方法报告
  • Vue(组件化编程:非单文件组件、单文件组件)
  • 【MATLAB数据处理实用案例详解(22)】——基于BP神经网络的PID参数整定
  • 第11章 项目人力资源管理
  • 07-Vue技术栈之(组件之间的通信方式)
  • 度量学习Metirc Learning和基于负例的对比学习Contrastive Learning的异同点思考
  • 3.编写油猴脚本之-helloword
  • openwrt的openclash提示【更新失败,请确认设备闪存空间足够后再试】
  • torch.nn.Module
  • 论文解析-基于 Unity3D 游戏人工智能的研究与应用
  • 6、Flutterr聊天界面网络请求
  • Java 8 腰斩!Java 17 暴涨 430%!!(文末福利)
  • 如何手写一个支持H.265的高清播放器
  • Day 1 认识软件测试——(软件测试定义、目的、原则)
  • Docker Harbor