当前位置: 首页 > news >正文

多线程优化API请求:CountDownLatch与PriorityBlockingQueue的应用

目录

前言

CountDownLatch是什么?

PriorityBlockingQueue是什么?

场景描述

解决方案

定义统一工厂制造类

定义制造厂

定义客户请求实现

定义控制器

定义启动类

结果呈现

启动项目

请求制造操作

总结


前言

写这篇文章的缘由是因为之前在面试期间经常被提到的一个场景题,“前端向后端发起一个API请求,该API需要处理复杂的业务逻辑,涉及多个相互独立的业务模块。每个业务模块都需要执行特定的操作,且这些操作彼此之间没有依赖关系。然而,每个模块的处理都需要一定的时间,导致整体的接口响应时间较长,请给出优化接口的方案,而且结果必须通过当前接口返回”。或许大家立马想到的都是通过多线程或者通过队列异步来完成,结果延迟返回,问题的难点在于怎么能在当前接口返回最终的结果呢?在学习完RocketMq源码后我找到了最佳方案。多线程(Runnable)结合CountDownLatch以及PriorityBlockingQueue就是答案。

CountDownLatch是什么?

CountDownLatch 是 Java 并发工具库中的一个类,用于同步一个或多个线程,确保某些操作在其他操作完成之前不会继续执行。它能够使一个线程等待其他线程完成各自的工作后再继续执行

读完CountDownLatch的概述我们大概能猜出它在方案中的作用了吧,描述得也很清晰了。就是在场景是主线程启动了多个工作线程,并等待所有工作线程完成工作后再继续。

主要方法

  • void await():使当前线程等待,直到计数到达零释放。
  • boolean await(long timeout, TimeUnit unit)使当前线程等待,直到计数到达零或者等待超时才释放。
  • void countDown()递减计数,如果计数到达零,则释放所有等待的线程。
  • long getCount():返回当前计数。

PriorityBlockingQueue是什么?

PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 提供的一个线程安全的无界优先级队列。它结合了优先级队列和阻塞队列的特点,在多线程环境下非常有用。默认使用元素的自然顺序(通过实现 Comparable 接口的 compareTo 方法)。你也可以通过提供自定义的 Comparator 实现定制排序逻辑。

在本次方案中就是作为一个内存队列,通知其他独立的业务模块执行操作。

主要方法

  • boolean add(E e) / boolean offer(E e): 插入指定元素,返回 true 表示插入成功。
  • E take(): 检索并移除队列的头部元素,如果队列为空,则等待直到有元素可用。
  • E poll(long timeout, TimeUnit unit): 检索并移除队列的头部元素,如果队列为空,则等待指定的时间。
  • E peek(): 检索但不移除队列的头部元素,如果队列为空,则返回 null
  • int size(): 返回队列中的元素数量。

场景描述

根据上面问题,我们模拟一个场景:一个客户订购了一架飞机、一艘轮船、一辆汽车,但是飞机、轮船、汽车各自的工厂都相隔很远,客户想以最短的时间同时拥有它们,但生性多疑的他,为了防止制造厂商偷工减料他提出想亲自监工这三个交通工具的制造。请问客户与制造厂该怎么配合才能在客户亲自监工的情况下又能最快时间的同时拥有汽车、轮船和飞机呢?

  • 一架飞机的制造时间为4s
  • 一艘轮船的制造时间为3s
  • 一辆汽车的制造时间为2s

方案一:传统方案就是,客户先去飞机厂通知开始制造飞机并现场监督,飞机制造完成后再去轮船厂其次再去汽车厂(顺序任意),抛出中间路程的时间,总共用时至少得 4+3+2=9s,时间太长客户不接受。

方案二:客户通过手机同时给飞机、轮船以及汽车厂发生消息通知他们开始制造,并且在三个厂商制造间安装监控,客户通过同时监督它们制造,因为飞机制造时间最长,那么当飞机制造完成,轮船和汽车肯定也已经完成了,那么总共用时最快4s,完美解决。

解决方案

 GitHub源码地址:点击获取

首先通过idea创建一个springboot项目

定义统一工厂制造类

每个工厂都有制造方法FactoryService

public interface FactoryService {//制造boolean factoryOfManufacture();
}

定义制造厂

飞机、轮船、汽车都是独立的制造厂家,所以需要三个独立的线程来处理.

定义公共抽象线程类ServiceThread并实现多线程Runnablel类的启动方法start()

@Slf4j
public abstract class ServiceThread implements Runnable{protected Thread thread;protected boolean isDaemon = false;//Make it able to restart the threadprivate final AtomicBoolean started = new AtomicBoolean(false);public ServiceThread() {}//获取线程名称public abstract String getServiceName();//线程启动方法public void start() {log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);if (!started.compareAndSet(false, true)) {return;}this.thread = new Thread(this, getServiceName());this.thread.setDaemon(isDaemon);this.thread.start();log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);}
}

定义飞机厂类AirPlaneService基础线程类ServiceThread以及实现工厂类FactoryService

@Service
@Slf4j
public class AirPlaneService extends ServiceThread implements FactoryService {public static ConcurrentMap<String, AirplaneRequest> requestTable =new ConcurrentHashMap<>();//接收制造请求通知public static PriorityBlockingQueue<AirplaneRequest> requestQueue =new PriorityBlockingQueue<>();//请求通知静态内部类对象public static class AirplaneRequest implements Comparable<AirplaneRequest> {//线程完成设置为0private CountDownLatch countDownLatch = new CountDownLatch(1);//用户idprivate String userId;public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}@Overridepublic int compareTo(AirplaneRequest o) {return 0;}}//获取当前线程名称并设置线程名称@Overridepublic String getServiceName() {return AirPlaneService.class.getSimpleName();}//执行线程@Overridepublic void run() {log.info("飞机工厂启动-----------");//循环处理不同的请求通知while (this.factoryOfManufacture()) ;}public boolean factoryOfManufacture() {boolean isSuccess = false;AirplaneRequest airplaneRequest = null;try {//等待飞机制造请求airplaneRequest = requestQueue.take();log.info("开始飞机制造-----------");//校验数据是否合法AirplaneRequest expectedRequest = this.requestTable.get(airplaneRequest.getUserId());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + airplaneRequest.getUserId());return true;}if (expectedRequest != airplaneRequest) {log.warn("never expected here,  maybe cause timeout " + airplaneRequest.getUserId());return true;}//...业务处理Thread.sleep(4000);//...isSuccess = true;} catch (InterruptedException e) {log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");return false;} finally {if (airplaneRequest != null && isSuccess) {log.info("飞机制造完成啦-----------");airplaneRequest.getCountDownLatch().countDown();}}return true;}
}

定义轮船厂类ShipService基础线程类ServiceThread以及实现工厂类FactoryService

@Service
@Slf4j
public class ShipService extends ServiceThread implements FactoryService {public static ConcurrentMap<String, ShipRequest> requestTable =new ConcurrentHashMap<>();public static PriorityBlockingQueue<ShipRequest> requestQueue =new PriorityBlockingQueue<>();public static class ShipRequest implements Comparable<ShipRequest> {private CountDownLatch countDownLatch = new CountDownLatch(1);private String userId;public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}@Overridepublic int compareTo(ShipRequest o) {return 0;}}@Overridepublic String getServiceName() {return ShipService.class.getSimpleName();}@Overridepublic void run() {log.info("轮船工厂启动-----------");while (this.factoryOfManufacture()) ;}@Overridepublic boolean factoryOfManufacture() {boolean isSuccess = false;ShipRequest shipRequest = null;try {shipRequest = requestQueue.take();log.info("开始制造轮船-----------");//校验数据是否合法ShipRequest expectedRequest = this.requestTable.get(shipRequest.getUserId());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + shipRequest.getUserId());return true;}if (expectedRequest != shipRequest) {log.warn("never expected here,  maybe cause timeout " + shipRequest.getUserId());return true;}//...业务处理Thread.sleep(3000);//...isSuccess = true;} catch (InterruptedException e) {log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");return false;} finally {if (shipRequest != null && isSuccess) {log.info("轮船制造完成啦-----------");shipRequest.getCountDownLatch().countDown();}}return true;}
}

定义汽车厂类CarService基础线程类ServiceThread以及实现工厂类FactoryService

@Service
@Slf4j
public class CarService extends ServiceThread implements FactoryService {public static ConcurrentMap<String, CarRequest> requestTable =new ConcurrentHashMap<>();public static PriorityBlockingQueue<CarRequest> requestQueue =new PriorityBlockingQueue<>();public static class CarRequest implements Comparable<CarRequest> {private CountDownLatch countDownLatch = new CountDownLatch(1);private String userId;public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}@Overridepublic int compareTo(CarRequest o) {return 0;}}@Overridepublic String getServiceName() {return CarService.class.getSimpleName();}@Overridepublic void run() {log.info("汽车工厂启动-----------");while (this.factoryOfManufacture());}@Overridepublic boolean factoryOfManufacture() {boolean isSuccess = false;CarRequest carRequest = null;try {carRequest = requestQueue.take();log.info("开始汽车制造-----------");//校验数据是否合法CarRequest expectedRequest = this.requestTable.get(carRequest.getUserId());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + carRequest.getUserId());return true;}if (expectedRequest != carRequest) {log.warn("never expected here,  maybe cause timeout " + carRequest.getUserId());return true;}//...业务处理Thread.sleep(2000);//...isSuccess = true;} catch (InterruptedException e) {log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");return false;} finally {if (carRequest != null && isSuccess) {log.info("汽车制造完成啦-----------");carRequest.getCountDownLatch().countDown();}}return true;}
}

定义客户请求实现

@Service
public interface PurchaseService {//请求制造Boolean manufacturing(String idCard);
}

实现逻辑类

@Service
@Slf4j
public class PurchaseServiceImpl implements PurchaseService {//超时时间private static int waitTimeOut = 1000 * 5;@Autowiredprivate AirPlaneService airPlaneService;@Autowiredprivate CarService carService;@Autowiredprivate ShipService shipService;@Overridepublic Boolean manufacturing(String userId) {long startTime = System.currentTimeMillis();//通知飞机厂计算飞机制造AirPlaneService.AirplaneRequest airplaneRequest = new AirPlaneService.AirplaneRequest();airplaneRequest.setUserId(userId);airPlaneService.requestTable.put(userId, airplaneRequest);airPlaneService.requestQueue.offer(airplaneRequest);//通知汽车厂计算汽车制造CarService.CarRequest carRequest = new CarService.CarRequest();carRequest.setUserId(userId);carService.requestTable.put(userId, carRequest);carService.requestQueue.offer(carRequest);//通知轮船厂计算轮船制造ShipService.ShipRequest shipRequest = new ShipService.ShipRequest();shipRequest.setUserId(userId);shipService.requestTable.put(userId, shipRequest);shipService.requestQueue.offer(shipRequest);//获取飞机制造AirPlaneService.AirplaneRequest airplaneOK = airPlaneService.requestTable.get(userId);//获取飞机制造CarService.CarRequest carOK = carService.requestTable.get(userId);//获取轮船制造ShipService.ShipRequest shipOK = shipService.requestTable.get(userId);try {//等待获取制造结果if (airplaneOK != null && carOK!= null && shipOK!= null) {boolean waitOK = airplaneOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);boolean waitOK2 = carOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);boolean waitOK3 = shipOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);//如果都成功了,返回trueif (waitOK && waitOK2 && waitOK3) {log.info("总共用时:"+(System.currentTimeMillis() - startTime));airPlaneService.requestTable.remove(userId);carService.requestTable.remove(userId);shipService.requestTable.remove(userId);return true;}}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException(e);}System.out.println("失败了总共用时:"+(System.currentTimeMillis() - startTime));return false;}
}

定义控制器

@RestController
public class UserController {@Autowiredprivate PurchaseService purchaseService;@GetMapping("/manufacturing")public Boolean manufacturing(String userId) {return  purchaseService.manufacturing(userId);}
}

定义启动类

同时启动飞机、轮船以及汽车的厂家线程

@SpringBootApplication
public class AsyncOneApplication {public static void main(String[] args) {SpringApplication.run(AsyncOneApplication.class, args);//启动飞机制造线程AirPlaneService airPlaneService = new AirPlaneService();airPlaneService.start();//启动汽车制造线程CarService carService = new CarService();carService.start();//启动轮船制造线程ShipService shipService = new ShipService();shipService.start();}
}

结果呈现

启动项目

可以看到飞机、汽车以及轮船各自的线程都启动了并成功设置了对应的线程名称

并且第一条日志输出成功,因为内存队列requestQueue中目前没有数据,所以线程阻塞。其他两个同理。

请求制造操作

请求接口返回true

查看日志

可以发现,飞机、轮船、汽车的制造都完成了,并且和前面说的一样用时4s左右,完美实现。

可能有人会发出疑问,为何每个服务中的请求对象中都要单独定义CountDownLatch,难道不可以定义一个全局的CountDownLatch数字设置为3就可以了,每个线程完成了就减1就可以了,其实这种方式也可以,但考虑到实际业务中3个服务线程并不一定会被同时投递,或者小明只单独订购飞机呢是吧,所以对每个线程的请求对象单独进行维护CountDownLatch。


总结

在本文中,我们深入探讨了如何优化一个复杂的API请求,涉及多个独立业务模块且需要尽快返回结果的问题。通过结合多线程(Runnable)、CountDownLatch以及PriorityBlockingQueue,我们实现了高效的并行处理有序的任务管理,成功地在当前接口返回了最终的结果。这一解决方案不仅提高了系统性能,还确保了结果的及时返回,具有很强的实用性和可操作性。希望本文提供的思路和实践能为大家在类似场景中提供有效的解决方案。

http://www.lryc.cn/news/408087.html

相关文章:

  • 谷粒商城实战笔记-54-商品服务-API-三级分类-拖拽效果
  • AI大模型学习必备十大网站
  • Elasticsearch:Golang ECS 日志记录 - zap
  • 关于线性代数(考研)
  • 【java基础】spring springMVC springboot 的区别
  • 【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 开源项目热度排行榜(100分) - 三语言AC题解(Python/Java/Cpp)
  • 大模型算法面试题(十一)
  • CSS 基础知识
  • IntelliJ IDEA 和 Eclipse的区别
  • Ansible之playbook剧本编写(二)
  • 力扣第二十九题——两数相除
  • 解析三款热门的文献翻译工具:优势与使用指南
  • git 过滤LFS文件下载
  • 内存泄漏详解
  • 多角度解析高防CDN防御DDOS及CC攻击
  • (7) cmake 编译C++程序(二)
  • C语言系统调用linux文件系统
  • LeetCode142 环形链表 II
  • 逆向案例二十八——某高考志愿网异步请求头参数加密,以及webpack
  • WebKit的文本装饰艺术:CSS Text Decoration全解析
  • 【linux】Shell脚本三剑客之sed命令的详细用法攻略
  • 解析class字节码文件获取魔数和版本号
  • 技术文档总结----思维导图
  • 【iOS】—— retain\release实现原理和属性关键字
  • 这一文,关于Java泛型的点点滴滴 一
  • 微信小程序之调查问卷
  • 基于Qt的视频剪辑
  • electron 网页TodoList工具打包成win桌面应用exe
  • 数据结构之判断二叉树是否为搜索树(C/C++实现)
  • golang长连接的误用