当前位置: 首页 > news >正文

kafka服务端允许生产者发送最大消息体大小

1、kafka config服务端配置文件server.properties

        server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。

message.max.bytes=5242880

        在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息就能不断发送。

2、重启kafka服务

3、生产者配置

# 发送所有ISR
acks=all
# 重试次数
retries=2
# 批量发送大小(128KB)
batch.size=131072
# 消息体大小(5M)
max.request.size=5242880
# 缓存大小,根据本机内存大小配置(64M)
buffer.memory=67108864
# 发送频率,和batch.size参数满足任一条件发送
linger.ms=5
# 发送端id,便于统计(单线程时)
client.id=producer-asyn
# Key序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Value序列化
value.serializer=org.apache.kafka.common.serialization.StringSerializer

4、springboot kafka 配置 max.request.size

1、在 application.properties 文件中添加如下配置

spring.kafka.producer.max-request-size=5242880

 2、在 KafkaTemplate 中添加如下配置

        Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));

 3、在 KafkaProducer 中添加如下配置

        Properties props = new Properties();props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);KafkaProducer<String, String> producer = new KafkaProducer<>(props);

5、配置类参数设置

        Properties props = new Properties();props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("buffer.memory", 67108864);props.put("batch.size", 131072);props.put("linger.ms", 100);props.put("max.request.size", 10485760);props.put("retries", 10);props.put("retry.backoff.ms", 500);props.put("acks", "1");KafkaProducer<String, String> producer = new KafkaProducer<>(props);

6、参数详解

  • buffer.memory

  Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。

        buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB。

  如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。 

  所以“buffer.memory”参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测,调试出来一个合理值。

  • batch.size

  每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说batch.size默认值是16KB,那么里面凑够16KB的数据才会发送。

  理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。

  但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就会很高。

  一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。

  • linger.ms

  一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。

  比如说batch.size是16KB,但是现在某个低峰时间段,发送消息量很小。这会导致可能Batch被创建之后,有消息进来,但是迟迟无法凑够16KB,难道此时就一直等着吗?

  当然不是,假设设置“linger.ms”是50ms,那么只要这个Batch从创建开始到现在已经过了50ms了,哪怕他还没满16KB,也会被发送出去。 

  所以“linger.ms”决定了消息一旦写入一个Batch,最多等待这么多时间,他一定会跟着Batch一起发送出去。 

  linger.ms配合batch.size一起来设置,可避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。

  • max.request.size

  决定了每次发送给Kafka服务器请求消息的最大大小。

  如果发送的消息都是大报文消息,每条消息都是数据较大,例如一条消息可能要20KB。此时batch.size需要调大些,比如设置512KB,buffer.memory也需要调大些,比如设置128MB。 

  只有这样,才能在大消息的场景下,还能使用Batch打包多条消息的机制。

  此时“max.request.size”也得同步增加。

  • retries和retries.backoff.ms

  重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒,根据业务场景需要设置。

  • acks

acks

含义
0 Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1 Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 接收成功。
-1 或 all Producer 往集群发送数据需要所有的ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

附:

Kafka参数调优,解决The request included a message larger than the max message size the server will accept._Pallas_Cat的博客-CSDN博客

转载请注明出处:BestEternity亲笔。

http://www.lryc.cn/news/99353.html

相关文章:

  • 台阶型Nim游戏博弈论
  • NestJS 的 中间件 学习
  • 搭建自己第一个golang程序
  • Mysql加锁过程
  • 财经界杂志财经界杂志社财经界编辑部2023年第19期目录
  • Linux常用命令——dpkg-split命令
  • 常见的二十种软件测试方法详解
  • Python(一)
  • git pull无效,显示 * branch master -> FETCH_HEADAlready up to date. pull无效解决方法
  • SK5代理与socks5代理
  • 【【51单片机红外遥控小风车】】
  • 如何连接远程服务器?快解析内内网穿透可以吗?
  • 【云边有个小卖部】上新《探秘Linux》第三章 Linux 软件包管理器 yum
  • 【深度学习】【Image Inpainting】Free-Form Image Inpainting with Gated Convolution
  • 游戏引擎UE如何革新影视行业?创意云全面支持UE云渲染
  • DB-GPT:强强联合Langchain-Vicuna的应用实战开源项目,彻底改变与数据库的交互方式
  • STM32CubeMX v6.9.0 BUG:FLASH_LATENCY设置错误导致初始化失败
  • K8s-资源管理(二)
  • 脉冲信号测试应如何选择示波器带宽?
  • OpenCV DNN模块推理YOLOv5 ONNX模型方法
  • ThirdAI 的私有和可个性化神经数据库:增强检索增强生成(第 3/3 部分)
  • C# 解决TCP Server 关不掉客户端连接的问题
  • JS判断类型的方法和对应的局限性(typeof、instanceof和Object.prototype.toString.call()的用法)
  • mongostat跟踪Mongodb运行的状态
  • 华为数通HCIA-数通网络基础
  • 【设计模式】详解单例设计模式(包含并发、JVM)
  • 监控和可观察性在 DevOps 中的作用!
  • 论文分享:PowerTCP: Pushing the Performance Limits of Datacenter Networks
  • 浏览器的同源策略 - 跨域问题
  • go 查询采购单设备事项[小示例]V2-两种模式{严格,包含模式}