当前位置: 首页 > news >正文

响应式编程Reactor优化Callback回调地狱

1. Reactor是什么

  • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono表示一个包含零或一个元素的异步序列)。
  • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。

2. Reactor、Callback、CompletableFuture三种形式异步编码对比

  • 编码简洁程度Reactor最优
  • Reactor线程利用率最高(因实现了Reactive Streams规范,拥有背压+事件驱动特性,此处暂不展开)

代码如下:

pom依赖

<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency>
</dependencies>

Callback回调地狱

interface FirstCallback {void onCompleteFirst(String result);void onErrorFirst(Exception e);
}interface SecondCallback {void onCompleteSecond(String result);void onErrorSecond(Exception e);
}interface ThirdCallback {void onCompleteThird(String result);void onErrorThird(Exception e);
}class AsyncOperations {static void firstOperation(FirstCallback firstCallback) {new Thread(() -> {try {// 模拟异步操作Thread.sleep(2000);// 操作完成后调用回调函数firstCallback.onCompleteFirst("First operation completed");} catch (Exception e) {// 发生异常时调用错误回调firstCallback.onErrorFirst(e);}}).start();}static void secondOperation(String input, SecondCallback secondCallback) {new Thread(() -> {try {// 模拟异步操作Thread.sleep(2000);// 操作完成后调用回调函数secondCallback.onCompleteSecond("Second operation completed with input: " + input);} catch (Exception e) {// 发生异常时调用错误回调secondCallback.onErrorSecond(e);}}).start();}static void thirdOperation(String input, ThirdCallback thirdCallback) {new Thread(() -> {try {// 模拟异步操作Thread.sleep(2000);// 操作完成后调用回调函数thirdCallback.onCompleteThird("Third operation completed with input: " + input);} catch (Exception e) {// 发生异常时调用错误回调thirdCallback.onErrorThird(e);}}).start();}
}public class CallbackHellExample {public static void main(String[] args) {AsyncOperations.firstOperation(new FirstCallback() {@Overridepublic void onCompleteFirst(String result) {System.out.println("First Callback: " + result);// 第一次操作完成后调用第二次操作AsyncOperations.secondOperation(result, new SecondCallback() {@Overridepublic void onCompleteSecond(String result) {System.out.println("Second Callback: " + result);// 第二次操作完成后调用第三次操作AsyncOperations.thirdOperation(result, new ThirdCallback() {@Overridepublic void onCompleteThird(String result) {System.out.println("Third Callback: " + result);}@Overridepublic void onErrorThird(Exception e) {System.out.println("Error in Third Callback: " + e.getMessage());}});}@Overridepublic void onErrorSecond(Exception e) {System.out.println("Error in Second Callback: " + e.getMessage());}});}@Overridepublic void onErrorFirst(Exception e) {System.out.println("Error in First Callback: " + e.getMessage());}});// 主线程继续执行其他操作System.out.println("Main thread continues...");}
}

CompletableFuture优化Callback回调地狱

public class CompletableFutureExample {public static void main(String[] args) {CompletableFuture<String> firstOperation = CompletableFuture.supplyAsync(() -> {try {// 模拟异步操作Thread.sleep(2000);return "First operation completed";} catch (InterruptedException e) {throw new RuntimeException(e);}});CompletableFuture<String> secondOperation = firstOperation.thenApplyAsync(result -> {System.out.println("First CompletableFuture: " + result);try {// 模拟异步操作Thread.sleep(2000);return "Second operation completed with input: " + result;} catch (InterruptedException e) {throw new RuntimeException(e);}});CompletableFuture<String> thirdOperation = secondOperation.thenApplyAsync(result -> {System.out.println("Second CompletableFuture: " + result);try {// 模拟异步操作Thread.sleep(2000);return "Third operation completed with input: " + result;} catch (InterruptedException e) {throw new RuntimeException(e);}});thirdOperation.whenComplete((result, throwable) -> {if (throwable == null) {System.out.println("Third CompletableFuture: " + result);} else {System.out.println("Error in CompletableFuture: " + throwable.getMessage());}});// 主线程继续执行其他操作System.out.println("Main thread continues...");// 等待所有操作完成CompletableFuture.allOf(firstOperation, secondOperation, thirdOperation).join();}
}

Reactor优化Callback回调地狱

public class ReactorOptimizedExample {public static void main(String[] args) {Mono.fromCallable(() -> {// 模拟异步操作Thread.sleep(2000);return "First operation completed";}).subscribeOn(Schedulers.boundedElastic()).flatMap(result -> {System.out.println("First Reactor: " + result);return Mono.fromCallable(() -> {// 模拟异步操作Thread.sleep(2000);return "Second operation completed with input: " + result;}).subscribeOn(Schedulers.boundedElastic());}).flatMap(result -> {System.out.println("Second Reactor: " + result);return Mono.fromCallable(() -> {// 模拟异步操作Thread.sleep(2000);return "Third operation completed with input: " + result;}).subscribeOn(Schedulers.boundedElastic());}).doOnSuccess(result -> System.out.println("Third Reactor: " + result)).doOnError(error -> System.out.println("Error in Reactor: " + error.getMessage())).block(); // 阻塞等待操作完成// 主线程继续执行其他操作System.out.println("Main thread continues...");}
}

学习打卡:Java学习笔记-day06-响应式编程Reactor优化Callback回调地狱

http://www.lryc.cn/news/280251.html

相关文章:

  • React项目实战--------极客园项目PC端
  • Jerry每次能向前或向后走n*n步(始终不能超过初始位置1e5),q(q <= 1e5)次询问,求向前走d最少要几次
  • 【Spring Boot 3】【Flyway】数据库版本管理
  • 蓝桥杯基础数据结构(java版)
  • 39 C++ 模版中的参数如果 是 vector,list等集合类型如何处理呢?
  • 5.Pytorch模型单机多GPU训练原理与实现
  • 想成为一名C++开发工程师,需要具备哪些条件?
  • Qat++,轻量级开源C++ Web框架
  • openssl3.2 - 官方demo学习 - digest - EVP_MD_demo.c
  • uniapp 编译后文字乱码的解决方案
  • iOS中利用KeyChain永久保存用户信息的方法示例
  • 基于时域有限差分法的FDTD的计算电磁学算法(含Matlab代码)-YEE网格下的更新公式推导
  • win10使用debug,汇编初学
  • 怎么投稿各大媒体网站?
  • chatgpt免费使用的网站
  • 音频编辑软件:Studio One 6 中文
  • MySQL语句|使用UNION和UNION ALL合并两个或多个 SELECT 语句的结果集
  • UNRAID 优盘制作
  • 二、Java中SpringBoot组件集成接入【MySQL和MybatisPlus】
  • 银行测试--------转账
  • 阿里云最新优惠券领取方法及优惠活动汇总
  • 动态分配内存的风险
  • 多行SQL转成单行SQL
  • wpf的资源路径
  • shell 脚本之一键部署安装 Nginx
  • 第01章_Java语言概述拓展练习(为什么要设置path?)
  • 手机直连卫星及NTN简介
  • 对git中tag, branch的重新理解
  • python中none的替换方法:pandasnumpy
  • 您与此网站之间建立的连接不安全