当前位置: 首页 > news >正文

Kafka消费者相关原理

前言

前面已经介绍了Kafka的架构知识并引出了Kafka的相关专业名称进行解释

也介绍了Kafka生产者相关的原理

这次分享一下Kafka对消费者接收消息进行处理的运行机制和原理

消费者接收到消息之后,就是进行消费,但是消费者要维护Kafka中消费者偏移量主题的消息

所以消费者消费的时候一定是需要给消费者偏移量主题提交 消费者组/消费主题分区/偏移量这些消费者消费的信息,让Kafka维护消息偏移量信息的

消费者提交消息偏移量

自动提交

每隔 5 秒钟自动把最近拉取到的消息的 offset 提交给 Kafka 的 消费者偏移量主题

可以自己设置是否自动提交和自动提交消息偏移量的间隔

优点: 批量提交,减少提交压力

缺点:

        实时同步性差,若消费者挂掉,重新启动

                如果之前的消息没有消费完就提交了,会造成消息丢失

                如果之前的消息消费完了还没来得及提交,下次消费者还按之前的消息偏移量进行消费,会造成消息重复消费

手动提交

消费者消费完消息之后手动对偏移量进行提交

设置完手动提交需要自己提交消息偏移量,如果设置了手动提交但是不手动提交消息的偏移量此时消息偏移量没有维护,会导致重复消费

手动同步提交

手动同步提交就是当前线程提交消息偏移量阻塞,等待提交成功

手动异步提交

手动异步提交就是当前线程提交消息偏移量不阻塞,异步调用消费者提供回调方法来对异步提交结果进行处理

消费者长轮询拉取消息

拉取消息数量:默认每次拉取最多 500 条消息

长轮询等待时间:如果当前批次的消息不足 500 条,消费者会阻塞等待 最多 1 秒,期间 Broker 会推送新到达的消息

总结: 够500条直接返回,不够最多拉取一秒也返回

其中消费者长轮询拉取消息条数和长轮询等待时间是可以设置的

拉取完之后消费者就开始对消息进行消费了,当消费者消费完这次拉取的消息就会再次长轮询拉取消息

所以就引出了下面的消费者运行机制

消费者两次长轮询拉取消息间隔如果超过30秒就认定这个消费者处理业务能力差就把它剔除出消费者组。触发rebalance机制,rebalance机制会造成性能开销

消费者两次长轮询拉取消息间隔时间是可以设置的,需要根据消费者能力快慢进行设置

消费者的健康状态检查

消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出 消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费

新消费者指定分区和偏移量、时间消费

Kafka可以指定具体主题的分区进行消息的拉取

并且指定从这个主题分区的那个消息偏移量开始消费

如: 从头开始消费、指定从第几个偏移量开始消费、指定从过去的某个具体时间进行消费

其中,指定从过去某个时间进行消费的原理步骤

先通过指定过去的某个时间找到主题分区的偏移量位置,然后使用这个偏移量调用指定偏移量开始消费

所以需要配置消费者的消息偏移量配置

新消费组的消费偏移量规则

新消费组中的消费者在启动以后,默认消费新消息                          

        Latest:默认的,消费新消息

当然也可以通过配置让新消费组中的消费组从头开始消费消息

        earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

所以需要配置消费组的消息偏移量设置

其中,当消费者消息偏移量配合和消费组消息偏移量配置冲突的时候,优先消费者偏移量配置

http://www.lryc.cn/news/616405.html

相关文章:

  • 纳维 - 斯托克斯方程的存在性与光滑性:流体世界的千年谜题
  • Python训练营打卡DAY 26 函数专题1:函数定义与参数
  • 大模型工具集成四层架构:识别、协议、执行与实现
  • JS中typeof与instanceof的区别
  • 专题三_二分_二分查找
  • 单片机捷径
  • Shell脚本-了解i++和++i
  • Linux常用命令(后端开发版)
  • NVIDIA Jetson AGX Orin 全景解析——边缘计算的高性能选择
  • 6A 工作流:让 Cursor、Trae 等AI编程助手按流程交付的实战手册
  • 机器学习——多元线性回归
  • React Profiler
  • HarmonyOS NEXT系列之编译三方C/C++库
  • 【Jenkins入门以及安装】
  • 《动手学深度学习》读书笔记—10.4 Bahdanau注意力
  • 移动端音频处理实践:59MB变声应用的技术实现分析
  • MySQL中的in和exists的区别
  • C++多线程服务器
  • Spring循环依赖详解
  • MySQL面试题及详细答案 155道(041-060)
  • LeeCode 46. 全排列
  • 冒泡排序实现以及优化
  • 20250810 | 深度学习入门笔记1
  • 大型动作模型LAM:让企业重复任务实现80%效率提升的AI技术架构与实现方案
  • 五种 IO 模型与阻塞 IO
  • 数组中的第K个最大元素
  • MyBatisPlus插件原理
  • Leetcode 3646. Next Special Palindrome Number
  • 代码随想录算法训练营第六十天|图论part10
  • 【Nginx②】 | Nginx部署前端静态文件指南(基于虚拟机环境)