当前位置: 首页 > news >正文

Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

http://www.lryc.cn/news/481623.html

相关文章:

  • 网络安全现状:复杂的威胁形势导致压力水平飙升
  • 【机器学习】强化学习(1)——强化学习原理浅析(区分强化学习、监督学习和启发式算法)
  • 【SoC设计指南 基于Arm Cortex-M】学习笔记1——AMBA
  • flutter鸿蒙模拟器 Win环境调试报错问题记录(暂未解决)
  • 详解Rust标准库:HashSet
  • 记录学习react的一些内容
  • json绘制热力图
  • linux 下查看程序启动的目录
  • 书生浦语第四期基础岛L1G2000-玩转书生「多模态对话」与「AI搜索」产品
  • 保护Kubernetes免受威胁:容器安全的有效实践
  • 【客观理性深入讨论国产中间件及数据库-科创基础软件】
  • MFC中Excel的导入以及使用步骤
  • AWS S3在客户端应用不能使用aws-sdk场景下的文件上传与下载
  • 深入解析 Transformers 框架(四):Qwen2.5/GPT 分词流程与 BPE 分词算法技术细节详解
  • 【Python-AI篇】K近邻算法(KNN)
  • aws xray如何实现应用log和trace的关联关系
  • centos服务器登录失败次数设定
  • 实时高效,全面测评快递100API的物流查询功能
  • 第14张 GROUP BY 分组
  • 笔记整理—linux驱动开发部分(10)input子系统与相关框架
  • [算法初阶]埃氏筛法与欧拉筛
  • 【THM】linux取证 DisGruntled
  • SpringBoot整合Freemarker(四)
  • centos docker 安装 rabbitmq
  • 手动实现promise的all,race,finally方法
  • H5移动端预览PDF方法
  • uniapp—android原生插件开发(1环境准备)
  • 《潜行者2切尔诺贝利之心》游戏引擎介绍
  • winform 加载 office excel 插入QRCode图片如何设定位置
  • 简易入手《SOM神经网络》的本质与原理