当前位置: 首页 > article >正文

kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消费线程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 处理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 处理records  }}
}

此方法需要引入一个共享的offsets来参与提交。

http://www.lryc.cn/news/2394351.html

相关文章:

  • mongodb删除字段
  • [JVM] JVM内存调优
  • Liunx部署ES单机集群
  • 秒出PPT正式改名秒出AI,开启AI赋能新体验!
  • Unity中的AudioManager
  • VM改MAC电脑密码(截图)
  • SpringBoot+Vue+微信小程序校园自助打印系统
  • 【论文精读】2024 CVPR--Upscale-A-Video现实世界视频超分辨率(RealWorld VSR)
  • 学术合作交流
  • 【线上故障排查】Redis缓存与数据库中数据不一致问题的排查与同步策略优化
  • 【Git命令】
  • 【LUT技术专题】图像自适应3DLUT
  • 德拜温度热容推导
  • 扫一扫的时候会经历哪些事
  • Typescript学习教程,从入门到精通,TypeScript 泛型与类型操作详解(二)(17)
  • 【iOS】源码阅读(五)——类类的结构分析
  • 基于CangjieMagic的RAG技术赋能智能问答系统
  • 算力租赁革命:弹性模式如何重构数字时代的创新门槛​
  • 图论回溯
  • 使用arthas热替换在线运行的java class文件
  • RFID测温芯片助力新能源产业安全与能效提升
  • S32K3 工具篇9:如何在无源码情况下灵活调试elf文件
  • Nacos 配置文件总结
  • ASP.NET Web Forms框架识别
  • LG P4119 [Ynoi2018] 未来日记 Solution
  • 流程引擎选型指南
  • 基于大模型预测带状疱疹(无并发症)诊疗方案的研究报告
  • 哈工大计统大作业-程序人生
  • 设计模式——装饰器设计模式(结构型)
  • 途景VR智拍APP:开启沉浸式VR拍摄体验