当前位置: 首页 > news >正文

CountDownLatch阻塞后countDown未执行会如何?

背景

某项目封装了 Kafka 消费者 API,根据传递的消费者线程数,创建 N 个消费者线程同时消费对应 topic 的数据,并在线程启动后收集到全局列表中,方便在程序调用 stop 流程时逐个停止。

主控类在创建 Kafka 消费线程时使用了 CountDownLatch ,将启动的线程收集到全局列表,并阻塞等待所有线程初始化完成;消费者线程指定 Kafka 订阅方法后对计数器减一,然后轮询消费 Kafka 的数据。

近日因某场景下不想消费某类 topic 数据而将 topic 设置为空,预想其他几类的 topic 数据应该正常消费,结果发现第一个 topic 设置为空后,其他几类消费线程都没有正常启动。

封装逻辑

程序封装了一个 KafkaConsumerThread 类,根据配置的线程数启动 N 个线程消费目标 topic 数据,基本代码如下:
在这里插入图片描述
用 CountDownLatch 控制消费者线程的初始化,本意是在 run 方法执行的时候就对计数器减一,标识本消费线程初始化完成的。

  1. 根据线程数创建 CountDownLatch 计数器。
  2. 订阅 Kafka topic。
  3. 计数器减一。
  4. 记录启动的线程对象。
  5. 主程序阻塞等待消费线程 run 方法执行到计数器减一。

问题排查

有一个 topic 设置为空后,对应的消费者线程启动报异常了:

java.lang.IllegalArgumentException: 
Topic collection to subscribe to 
cannot contain null or empty topic

一个消费异常,但其他消费者没有启动,为什么呢?理论上它们并不相干才对。

打印程序堆栈信息,发现程序阻塞了:
在这里插入图片描述

封装的 Kafka API 是顺次启动几类 topic 消费线程的,因为启动第一个 topic 消费线程时,因 topic 设置为空,consumer.subscribe(config.getTopics()) 这句代码异常了,其后面的 countDown 未执行而引发阻塞

第一个 topic 消费启动异常后,程序因调用了 countDownLatch.await() 而阻塞了,因此后面代码就不执行了,继而程序呈现异常状态。

基础巩固

CountDownLatch 是 JUC 包同步工具类,用于协调多个线程。它允许一个或多个线程等待,直到其他线程中执行的一组操作完成。CountDownLatch 通过一个计数器来实现,该计数器由线程递减,计数器值到达零后,所有调用过 await 方法的线程将解除阻塞状态。

  • 创建:new CountDownLatch 对象时,指定计数器的初始值。
  • 阻塞:一个或多个线程调用 await 方法,进入阻塞等待状态,直到计数器的值变为零。
  • 倒计数:其他线程在完成各自任务后调用 countDown 方法,将计数器的值减一。当计数器的值减到零时,所有在 await 上等待的线程会被唤醒,继续执行。

启示录

同步锁使用不当容易引发死锁问题,阿里开发者规范在 countDown() 方法处有一个提示:
在这里插入图片描述
这个提示也不准确,因为这个是一个 Kafka 消费线程,它以线程中断状态为标识,循环从 Kafka 中 poll 数据处理的,所以不能在 finally 中调用。但是也不能在 subscribe 之后调用,因为该语句会异常。

到底应该在哪里对计数器减一才能保证即使异常,也能正常减一呢?有两个方法:

  1. 简化处理,在线程的 run 方法第一行调用。
  2. 稍微复杂一点,添加一个开关,在 countDown 后面设置为 true,然后再 finally 里面判断,如果这个开关的代码没有走到,说明后面异常了,就在对计数器再补充减一:在这里插入图片描述
    其实这个问题产生的根源是没有对 topic 进行判空,如果源头控制了,就不会出现这种异常了。

PS:真心再推荐一下 utools 工具,整理本文时堆栈信息是从七天前的剪切板里面找出来的:
在这里插入图片描述
对我这种一天不知道复制粘贴多少次的人来说,这个工具真的很好用啊!

http://www.lryc.cn/news/502151.html

相关文章:

  • k8s,operator
  • 使用 pyperclip 进行跨平台剪贴板操作
  • 20 设计模式之职责链模式(问题处理案例)
  • SpringBoot3集成MybatisPlus3和knife4j(swagger3兼容增强版)
  • 【MIT-OS6.S081作业1.3】Lab1-utilities primes
  • 游戏引擎学习第35天
  • learn-(Uni-app)输入框u-search父子组件与input输入框(防抖与搜索触发)
  • 设置IMX6ULL开发板的网卡IP的两种方法(临时生效和永久有效两种方法)
  • 流量转发利器之Burpsuite概述(1)
  • Transformer入门(6)Transformer编码器的前馈网络、加法和归一化模块
  • element-plus中的resetFields()方法
  • 【过滤器】.NET开源 ORM 框架 SqlSugar 系列
  • Jmeter Address already in use: connect 解决
  • C#常见错误—空对象错误
  • Leetcode数学部分笔记
  • 微信小程序web-view 嵌套h5界面 实现文件预览效果
  • 【汽车】-- 燃油发动机3缸和4缸
  • 轻量级的 HTML 模板引擎
  • Mysql | 尚硅谷 | 第02章_MySQL环境搭建
  • Maven学习(传统Jar包管理、Maven依赖管理(导入坐标)、快速下载指定jar包)
  • CTF: 在本地虚拟机内部署CTF题目docker
  • 视频推拉流EasyDSS无人机直播技术巡查焚烧、烟火情况
  • SpringBoot【十一】mybatis-plus实现多数据源配置,开箱即用!
  • 【嵌入式linux基础】关于linux文件多次的open
  • TPAMI 2023:When Object Detection Meets Knowledge Distillation: A Survey
  • 2024前端面试题(持续更新)
  • apache转nginx访问变成下载解决方法
  • 【iOS】OC高级编程 iOS多线程与内存管理阅读笔记——自动引用计数(三)
  • Oracle数据库使用dblink是时出现 ORA-12170:TNS:连接超时
  • OpenHarmony系统中实现Android虚拟化、模拟器相关的功能,包括桌面显示,详细解决方案