当前位置: 首页 > news >正文

【并发编程】CountDownLatch

       📝个人主页:五敷有你      

 🔥系列专栏:并发编程

⛺️稳中求进,晒太阳

CountDownLatch 概念

CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CountDownLatch 常用方法说明

CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。 ​ await();//阻塞当前线程,将当前线程加入阻塞队列。 ​ await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行, ​ countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。

用CountDownLatch 来优化我们的报表统计

功能现状

运营系统有统计报表、业务为统计每日的用户新增数量、订单数量、商品的总销量、总销售额......等多项指标统一展示出来,因为数据量比较大,统计指标涉及到的业务范围也比较多,所以这个统计报表的页面一直加载很慢,所以需要对统计报表这块性能需进行优化。

问题分析

统计报表页面涉及到的统计指标数据比较多,每个指标需要单独的去查询统计数据库数据,单个指标只要几秒钟,但是页面的指标有10多个,所以整体下来页面渲染需要将近一分钟。

解决方案

任务时间长是因为统计指标多,而且指标是串行的方式去进行统计的,我们只需要考虑把这些指标从串行化的执行方式改成并行的执行方式,那么整个页面的时间的渲染时间就会大大的缩短, 如何让多个线程同步的执行任务,我们这里考虑使用多线程,每个查询任务单独创建一个线程去执行,这样每个统计指标就可以并行的处理了。

要求

因为主线程需要每个线程的统计结果进行聚合,然后返回给前端渲染,所以这里需要提供一种机制让主线程等所有的子线程都执行完之后再对每个线程统计的指标进行聚合。 这里我们使用CountDownLatch 来完成此功能。

模拟代码

1、分别统计4个指标用户新增数量、订单数量、商品的总销量、总销售额;

2、假设每个指标执行时间为3秒。如果是串行化的统计方式那么总执行时间会为12秒。

3、我们这里使用多线程并行,开启4个子线程分别进行统计

4、主线程等待4个子线程都执行完毕之后,返回结果给前端。

​//用于聚合所有的统计指标private static Map map=new HashMap();//创建计数器,这里需要统计4个指标private static CountDownLatch countDownLatch=new CountDownLatch(4);
​public static void main(String[] args) {
​//记录开始时间long startTime=System.currentTimeMillis();
​Thread countUserThread=new Thread(new Runnable() {public void run() {try {System.out.println("正在统计新增用户数量");Thread.sleep(3000);//任务执行需要3秒map.put("userNumber",1);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计新增用户数量完毕");} catch (InterruptedException e) {e.printStackTrace();}
​}});Thread countOrderThread=new Thread(new Runnable() {public void run() {try {System.out.println("正在统计订单数量");Thread.sleep(3000);//任务执行需要3秒map.put("countOrder",2);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计订单数量完毕");} catch (InterruptedException e) {e.printStackTrace();}
​}});
​Thread countGoodsThread=new Thread(new Runnable() {public void run() {try {System.out.println("正在商品销量");Thread.sleep(3000);//任务执行需要3秒map.put("countGoods",3);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计商品销量完毕");} catch (InterruptedException e) {e.printStackTrace();}
​}});
​Thread countmoneyThread=new Thread(new Runnable() {public void run() {try {System.out.println("正在总销售额");Thread.sleep(3000);//任务执行需要3秒map.put("countmoney",4);//保存结果值countDownLatch.countDown();//标记已经完成一个任务System.out.println("统计销售额完毕");} catch (InterruptedException e) {e.printStackTrace();}
​}});//启动子线程执行任务countUserThread.start();countGoodsThread.start();countOrderThread.start();countmoneyThread.start();
​try {//主线程等待所有统计指标执行完毕countDownLatch.await();long endTime=System.currentTimeMillis();//记录结束时间System.out.println("------统计指标全部完成--------");System.out.println("统计结果为:"+map.toString());System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒");
​} catch (InterruptedException e) {e.printStackTrace();}
​
​}
​

执行结果

CountDownLatch实现原理

1、创建计数器

当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数;

  public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);//创建同步队列,并设置初始计数器值}

2、阻塞线程

当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。

  public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

判断计数器是技术完毕,未完毕则把当前线程加入阻塞队列

  public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

构建阻塞队列的双向链表,挂起当前线程

 private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//新建节点加入阻塞队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获得当前节点pre节点final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);//返回锁的stateif (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//重组双向链表,清空无效节点,挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

3、计数器递减

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。

http://www.lryc.cn/news/331536.html

相关文章:

  • 2024-HW --->SSRF
  • 该主机与 Cloudera Manager Server 失去联系的时间过长。 该主机未与 Host Monitor 建立联系
  • 【BUG】No module named ‘dnf‘
  • Ubuntu pycharm配置Conda环境
  • 工作体验记录
  • YOLO火灾烟雾检测数据集:20000多张,yolo标注完整
  • 基于Spring Boot的餐厅点餐系统
  • tkinter控件教程使用说明(三)
  • Electron 打包自定义NSIS脚本为安装向导增加自定义页面增加输入框
  • Idea2023创建Servlet项目
  • Day57:WEB攻防-SSRF服务端请求Gopher伪协议无回显利用黑白盒挖掘业务功能点
  • 【Qt】使用Qt实现Web服务器(十):前端基础
  • 使用vuepress搭建个人的博客(一):基础构建
  • ArcGIS Pro导出布局时去除在线地图水印
  • 启动mysql
  • C++实现二叉搜索树的增删查改(非递归玩法)
  • 软件架构复用
  • 【初阶数据结构】——leetcode:160. 相交链表
  • 【Go】goroutine并发常见的变量覆盖案例
  • 基于SSM+Jsp+Mysql的快递管理系统
  • 如何动态往Spring容器注册/移除bean?
  • C语言交换二进制位的奇数偶数位
  • 爬虫实战三、PyCharm搭建Scrapy开发调试环境
  • 2012年认证杯SPSSPRO杯数学建模C题(第一阶段)碎片化趋势下的奥运会商业模式全过程文档及程序
  • 【Next.js】连接 MongoDB 实现基本的接口
  • 中值滤波算法与SSE2指令集并行优化
  • 2012年认证杯SPSSPRO杯数学建模B题(第二阶段)节能减排全过程文档及程序
  • NOI - OpenJudge - 2.5基本算法之搜索 - 2753:走迷宫 - 超级无敌详细题解(含多个不同算法AC代码)
  • 什么是Redis数据一致性?如何解决?
  • 【办公软件】开发常用网站