当前位置: 首页 > news >正文

Kafka3.0.0版本——生产者如何提高吞吐量

目录

    • 一、生产者提高吞吐量参数设置
    • 二、产者提高吞吐量代码示例

一、生产者提高吞吐量参数设置

  • batch.size:设置批次大小,默认16k
  • linger.ms:设置等待时间,修改为5-100ms
  • buffer.memory:设置缓冲区大小, 默认 32M
  • compression.type:设置压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

二、产者提高吞吐量代码示例

  • 代码示例

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    /*** 生产者提高吞吐量* 1、设置批次大小,batch.size 默认 16K* 2、设置等待时间,linger.ms 默认 0* 3、设置缓冲区大小,buffer.memory 默认 32M* 4、设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd* */
    public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());/*** 4、提高吞吐量* *///设置缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//设置批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//设置等待时间 linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//设置压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//5、创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//6、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}//7、关闭资源kafkaProducer.close();}
    }
  • 在三台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]#  bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.29:9092 --topic news
    
  • 在 IDEA 中执行代码,观察 三台服务器控制台中是否接收到消息,如下图:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

http://www.lryc.cn/news/106862.html

相关文章:

  • js精度丢失的问题
  • C++ 编译预处理
  • 备战秋招 | 笔试强化22
  • LeetCode ACM模式——哈希表篇(二)
  • hadoop 3.1.3集群搭建 ubuntu20
  • 备忘录模式——撤销功能的实现
  • Golang 函数参数的传递方式 值传递,引用传递
  • K8s影响Pod调度和Deployment
  • 透明代理和不透明代理
  • 1424. 对角线遍历 II;2369. 检查数组是否存在有效划分;1129. 颜色交替的最短路径
  • 【漏洞复现】Metabase 远程命令执行漏洞(CVE-2023-38646)
  • Linux 9的repo for OVS build
  • DOCTYPE 是什么作用?
  • KubeSphere 3.4.0 发布:支持 K8s v1.26
  • 自然语言文本分类模型代码
  • Prometheus实现系统监控报警邮件
  • could not import go.etcd.io/etcd/clientv3-go
  • MySQL的行锁、表锁触发
  • mysql-入门笔记-3
  • 3分钟创建超实用的中小学新生录取查询系统,现在可以实现了
  • Redis 变慢了 解决方案
  • 远程仓库的操作
  • 一个监控系统的典型架构
  • 让GPT人工智能变身常用工具-中
  • HCIP中期实验
  • 《向量数据库指南》——向量数据库Milvus Cloud、Pinecone、Vespa、Weaviate、Vald、GSI 、 Qdrant选哪个?
  • python与深度学习(十一):CNN和猫狗大战
  • 经典CNN(三):DenseNet算法实战与解析
  • 学习笔记——压力测试案例,监控平台
  • sqlite 踩坑