当前位置: 首页 > news >正文

kafka 超详细的消息订阅与消息消费几种方式

kafka 消息订阅与消息消费几种方式

本文主要内容

  • 消费者订阅几种方式

    • 订阅多个主题

    • 按正则表达式订阅

  • 消息消费几种方式

    • 按分区消费

    • 按主题消费

    • 不区分

笔者建议一开始学习Kafka最好不要用SpringBoot 集成方式,因为SpringBoot推崇用注解方式,比如@KafkaListener 等,就可以直接消费,这样不能直接接触kafka-client一些api, 且SpringBoot 给我们提供了很多默认配置,我们几乎零配置也可以使用,实际上kafka很多配置很重要的,不容忽视。

消费者订阅几种方式

KafkaConsumer 给我们提供了几种订阅消息方式,我们可以订阅多个消息。示例代码如下

kafkaConsumer.subscribe(Arrays.asList("topicA","topicB"));kafkaConsumer.subscribe(Pattern.compile("topic-*"));kafkaConsumer.assign(Arrays.asList(new TopicPartition("topicA",0)));

订阅多个主题

void subscribe(Collection<String> topics) 对应上面第一行代码,这是最常见的订阅方式

按正则表达式订阅

void subscribe(Pattern pattern) 符合正则的主题都会被消费

有人创建了新的主题,并且与正则匹配,消费者也可以消费到

这种方式需要能对多种消息处理,对于一些能通用处理,不感知具体业务数据的场景比较合适。比如B系统需要同步A系统数据,我们按正则订阅,当A系统有新的数据需要同步,这是只需要A发满足条件正则的消息,B系统无需任何改动。

订阅指定分区

void assign(Collection<TopicPartition> partitions);正常业务不会使用,如果订阅的分区不存在,会报错。一些特殊场景,比如需要精确控制消费者消费消息,自定义分区分配策略时 可能会用到assign 方法



消息消费

kafka 采用客户端 拉取模式进行消息消费

poll() 返回所订阅的主题上一组消息ConsumerRecords ,我们可以对消息进行按主题、按分区进行处理,当然可以统一处理,不分主题和分区

ConsumerRecords<String,String> records =    kafkaConsumer.poll(Duration.ofMillis(1000));

不区分主题、分区

for(ConsumerRecord<String,String> record : records){// 处理消息
}

按partition 处理

Set<TopicPartition> topicPartitions = records.partitions();
for(TopicPartition topicPartition : topicPartitions){List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
}

按主题

  Iterable<ConsumerRecord<String,String>> iter = records.records(topic);

思考

kafka 给我们提供了灵活的消息订阅以及消息消费方式,我们需要根据实际业务场景选择。无论哪种场景都离不开 主题分区 ,最主要的是分区,当我们选择了某种订阅方式如果主题分区  发生了变化 ,消息还能正常消费吗

选择了按正则订阅消息方式, 后面创建了新的主题,该消息能被正常消费吗

选择了指定分区订阅, 如果后面扩容了新的分区,新分区消息能消费吗?

List<PartitionInfo> partitionsFor(String topic) 能获取分区情况,如果需要按分区订阅,该方法一定用的上


按分区维度消费消息,对于手动提交消息位移场景非常有用

主题分类处理消息也很常见,因为不同主题消息格式可能是不一样的,根据主题区分,很容易将不同的消息分类处理。

http://www.lryc.cn/news/441316.html

相关文章:

  • C++ 第三讲:内存管理
  • LeeCode打卡第二十九天
  • 阿里云专业翻译api对接
  • 基于Spring Boot的能源管理系统+建筑能耗+建筑能耗监测系统+节能监测系统+能耗监测+建筑能耗监测
  • 大数据新视界 --大数据大厂之 Cassandra 分布式数据库:高可用数据存储的新选择
  • ROS第五梯:ROS+VSCode+C++单步调试
  • SLA 概念和计算方法
  • C++比大小游戏
  • PCIe进阶之TL:Memory, I/O, and Configuration Request Rules TPH Rules
  • 【初阶数据结构】一文讲清楚 “堆” 和 “堆排序” -- 树和二叉树(二)(内含TOP-K问题)
  • sqli-lab靶场学习(二)——Less8-10(盲注、时间盲注)
  • Dijkstra算法和BFS算法(单源最短路径)
  • 在WordPress中最佳Elementor主题推荐:专家级指南
  • 关于RabbitMQ消息丢失的解决方案
  • c语言动态内存分配
  • 零基础制作一个ST-LINK V2 附PCB文件原理图 AD格式
  • nginx基础篇(一)
  • 监控系列之-Grafana面板展示及制作
  • 值传递和地址传递
  • Docker vs. containerd 深度剖析容器运行时
  • ARM32 base instruction -- blx
  • sql数据库
  • 2024/9/19 408大题专训之五段式指令流水线题型总结
  • Android SPN/PLMN 显示逻辑简介
  • 1.使用 VSCode 过程中的英语积累 - File 菜单(每一次重点积累 5 个单词)
  • 什么是数字化转型升级?
  • JAVA开源项目 校园美食分享平台 计算机毕业设计
  • MyBatis 增删改查【后端 17】
  • 计算机网络(运输层)
  • Linux 线程控制