当前位置: 首页 > news >正文

kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}
http://www.lryc.cn/news/483801.html

相关文章:

  • ASUS/华硕灵耀X双屏Pro UX8402Z 原厂Win11-22H2系统 工厂文件 带ASUS Recovery恢复
  • 【含开题报告+文档+PPT+源码】基于springboot的毕业设计选题管理系统
  • fastadmin常用操作
  • IPguard与Ping32:谁是企业数据防泄密的最佳选择?
  • C++20新特性的补充讲解
  • uni-app移动端与PC端兼容预览PDF文件
  • Elman 神经网络算法详解
  • 卓胜微嵌入式面试题及参考答案(2万字长文)
  • 【Python】爬虫使用代理IP
  • 金融机构-业务架构方案(高光版)
  • ubuntu内核切换network unclaimed 网卡丢失
  • 【人工智能】揭秘可解释性AI(XAI):从原理到实战的终极指南
  • 小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程
  • 图形 2.6 伽马校正
  • LLM - 计算 多模态大语言模型 的参数量(Qwen2-VL、Llama-3.1) 教程
  • 数据可视化这样做,汇报轻松拿捏(附免费好用可视化工具推荐)
  • 杂七杂八之基于JSON Web Token (JWT) 进行API认证和鉴权(Java版)
  • 建设展示型网站企业渠道用户递达
  • 如何通过AB测试找到最适合的Yandex广告内容
  • AI写作(四)预训练语言模型:开启 AI 写作新时代(4/10)
  • 解决Anaconda出现CondaHTTPError: HTTP 000 CONNECTION FAILED for url
  • 员工绩效统计出现很多小数点,处理方法大全
  • 【启明智显分享】5G CPE为什么适合应用在连锁店中?
  • 十大经典排序算法-希尔排序与归并排序
  • gitlab和jenkins连接
  • Qt Event事件系统小探2
  • [2024最新] java八股文实用版(附带原理)---java集合篇
  • pytorch tensor在CPU和GPU之间转换,numpy之间的转换
  • 【电压分层控制】光储三相并网下垂控制,直流微电网协调母线电压分层控制
  • 【CSS】absolute定位的默认位置