当前位置: 首页 > news >正文

kafka的处理的一些问题 消费延迟

kafka的处理的一些问题

  • 消费者客户端不但没有背压而且内存充足,但产生的消费延迟越来越大
  • 在Kafka的Leader副本宕机时

消费者客户端不但没有背压而且内存充足,但产生的消费延迟越来越大

在这里插入图片描述

比如我们这个kakfa集群一共有3个Broker节点

TOp1有5个分区,P0、P1、P2、P3、P4,这些分区分布在3个不同Broker节点上,而我们创建了包含两个消费者的消费者组。

消费者1同时消费P0、P1和P4分区的数据。
消费者2消费P2和P3分区的数据
看到消费延迟,大家想去就是增加消费者数量和分区数量,让我消费者数量增加到和Partition的数量一样多,这样每个消费者就可以仅仅消费一个分区的数据,可以达到消费能力1最大化 。

了解消费者背后的执行原理。该如何优化消费者消费数据的吞吐量。

在这里插入图片描述

消费者在调用poll()方法到远端的Broker节点拉去数据时。优先从nextInLineFetch中获取数据,这个nextInLineFetch就是数据接收缓冲区,
如果数据接收缓冲区中没有待消费的数据,这个时候才会调用SendFetches方法,到Broker端拉去数据,

kafka是向响应的Broker节点发送拉取数据的网络请求,我们都知道网路请求对于内存请求是比较慢的,因此这些拉取数据的网络请求是由Broker端异步执行的,异步执行拉取数据请求,就必须通过future监听数据是否已经准备好,当数据准备好之后,会异步将数放到数据接收缓存completedFetches中,

在这里插入图片描述
这是因为IO请求比较耗时,所以尽量一次批量拉取更多的数据放到缓存中,这样就可以降低发起网络的IO次数,进而提升消费能力,现在缓冲区completedFetches中已经有数据了,就会把completedFetches中队头的数据解析到nextInLineFetch中

在这里插入图片描述
解析成消费者可以消费的数据格式,然后清除completedFetches中队头的元素

在这里插入图片描述
随后如果有消费调用poll()方法拉取数,就会优先从nextInLineFetch中获取数据,注意,消费者客户端每次获取的数据量是由参数 max.poll.records控制的,默认值是500。 相当于每次从nextInLineFetch获取500条数据并返回给消费者。

在这里插入图片描述
当消费者消费完500条数据之后,会再次调用poll()方法,
在这里插入图片描述
再拉取500条数据 ,当消费者把nextlnLineFetch缓存的数据都消费完之后,相当于再调用poll()方式时,nextInLineFetch已经咩有待消费的数据了,这个时候,就会把completedFetch的新的队头元素解析解析成nextInLineFetch。可以适当的将该参数增加到16KB或者32KB

而参数fetch.max.bytes标识每次poll操作,从Broker端最多拉取数据量,默认值时50MB,如果我们内存资源充足,建议增大fetch.max.bytes增加到200MB以上.参数max.partition.fetch.bytes的默认值是1MB。表示每次poll返回的,每个Broker节点上每个分区的最大字节数。因此我们再回头看这个例子。

那么每次从Broker-102上最多能拉取到的数据也就是1MB。数据量未免太小了,有的时候刚消费完1MB,就得再次经过一次网络IO拉取下一批数据,这可能是造成消费延迟的主要原因。大家可以根据自己的Topic的实际分区数,来合理设置每个分区每次拉取数据的大小,因此建议可以将每个分区每次拉取数据的大小设置成10MB以上。 max.partition.fetch.bytes增加到10MB以上

但有的时候只是提高每个分区每次最大拉取到的数量也是不够的,因为每个Broker最多返回的最大字节数由参数fetch.max.bytes控制,这个参数的默认值是50MB,有时候也可以适当的提升这个参数的默认值,比如增加到200MB
这样就能再本地尽量缓存更多的数据,以提升消费者消费数据的能力,降低消费延迟,主要适用于内存充足,你消费能力不足的场景

消费客户端根本不能修改啦这个参数,因为设置了静态的
在这里插入图片描述

在Kafka的Leader副本宕机时

在这里插入图片描述

http://www.lryc.cn/news/505822.html

相关文章:

  • 旅游创业,千益畅行,开启新的旅游模式!
  • 集成自然语言理解服务,让应用 “听得懂人话”
  • 利用notepad++删除特定关键字所在的行
  • [HNOI2002] 营业额统计 STL - set集合
  • fastAPI接口(普通流式响应和大模型流式响应)
  • Linux系统安装node.js
  • 《解决两道有趣的编程问题:交替数字和与简单回文》
  • 2412d,d的8月会议
  • WEB自动化测试(selenium工具)框架、面试题
  • 前端自动化部署之ssh2和ssh2-sftp-client
  • python pandas 优化内存占用(一)
  • FutureCompletableFuture实战
  • Loki 微服务模式组件介绍
  • peerDependencies对等依赖
  • 贪心算法 part01
  • java开发入门学习二 - 变量
  • Qt Q_ENUM enum 转 QString 枚举字符串互转; C++模板应用
  • 0004.基于springboot+elementui的在线考试系统
  • 基于 iAP2 协议 的指令协议,用于对安防设备的 MCU 进行操作
  • 02-5.python入门基础一控制流(while)
  • Go语言开发入门与实战
  • HarmonyOS Next应用开发实战:ArkWeb组件使用介绍及使用举例
  • 【已解决】在Visual Studio里将应用与Microsoft Store关联时提示网络异常
  • springcloud-gateway获取应用响应信息乱码
  • [笔记]关于Qt的nativeEvent事件无法接收window消息的Bug
  • LeetCode 热题 100_K 个一组翻转链表(31_25_困难_C++)(四指针法)
  • Pytorch | 从零构建MobileNet对CIFAR10进行分类
  • CSS系列(18)-- 工程化实践详解
  • 日拱一卒(18)——leetcode学习记录:二叉树中的伪回文路径
  • hive—炸裂函数explode/posexplode