当前位置: 首页 > news >正文

kafka入门(三):kafka多线程消费

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,

(1) 可以增加Topic的分区数,并且增加消费组的消费者数量,让消费者数等于分区数。
(2) 还可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

http://www.lryc.cn/news/256101.html

相关文章:

  • android通过广播打印RAM信息
  • C++新经典模板与泛型编程:策略类模板
  • 微信小程序引入Vant Weapp修改样式不起作用,使用外部样式类进行覆盖
  • python核酸检测 青少年电子学会等级考试 中小学生python编程等级考试二级真题答案解析2022年6月
  • 搭建React项目,基于Vite+React+TS+ESLint+Prettier+Husky+Commitlint
  • ChatGPT在国内的使用限制,国内的ChatGPT替代工具
  • 服务器如何保证数据安全_Maizyun
  • sql2005日志文件过大如何清理
  • Linux--学习记录(2)
  • 字符串函数`strlen`、`strcpy`、`strcmp`、`strstr`、`strcat`的使用以及模拟实现
  • 插入排序与希尔排序(C语言实现)
  • 【微软技术栈】与其他.NET语言的互操作性 (C++/CLI)
  • TCPUDP使用场景讨论
  • C#最小二乘法线性回归
  • ULAM公链第九十六期工作总结
  • 基于Echarts的大数据可视化模板:智慧交通管理
  • C#-快速剖析文件和流,并使用
  • 【Linux】如何在Ubuntu 20.04上安装PostgreSQL
  • IT程序员面试题目汇总及答案-计算机面试
  • 【Flink on k8s】- 5 - 简要介绍 Flink
  • 物联网安全芯片ACL16 采用 32 位内核,片内集成多种安全密码模块 且低成本、低功耗
  • 【Linux top命令】
  • 深入理解 Promise:前端异步编程的核心概念
  • Linux 和 macOS 的主要区别在哪几个方面呢?
  • springboot(ssm寝室小卖部系统 宿舍小商店网站Java(codeLW)
  • 什么是web组态?一文读懂web组态
  • 华为OD机试真题-智能成绩表-2023年OD统一考试(C卷)
  • YOLOv5独家原创改进:SPPF自研创新 | 可变形大核注意力(D-LKA Attention),大卷积核提升不同特征感受野的注意力机制
  • 算法:进制之前的转换
  • VS2009和VS2022的错误列表可复制粘贴为表格