当前位置: 首页 > news >正文

SpringBoot Kafka消费者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}

三、消费者

@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}

http://www.lryc.cn/news/222950.html

相关文章:

  • git 标签相关命令
  • 我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)
  • 【 Docker: 数据卷挂载】
  • windows上的静态链接和动态链接的区别与作用(笔记)
  • MySQL和Postgresql数据库备份和恢复
  • 使用MCU上的I2C总线进行传感器应用
  • 汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)
  • ZZ308 物联网应用与服务赛题第E套
  • web相关框架
  • 安装dubbo-admin报错node版本和test错误
  • HTML使用canvas绘制海报(网络图片)
  • 20道高频JavaScript面试题快问快答
  • 【STM32】HAL库UART含校验位的串口通信配置BUG避坑
  • Python实用技巧:将 Excel转为PDF
  • 【面经】讲一下你对jvm和jmm的了解
  • 《网络协议》03. 传输层(TCP UDP)
  • ZooKeeper调优
  • 改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络
  • 开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器
  • Mac上好用的翻译软件推荐 兼容m
  • 软件下载网站
  • java获取近期视频流关键帧与截图
  • arcgis 批量删除Table中的某些Field
  • 工厂设备扫码使用售卖联网开发需要怎么开发开源代码?
  • 软考高级之132个工具和技术
  • 算法通过村第十八关-回溯|白银笔记|经典问题
  • vue2 集成 - 超图 - SuperMap iClient3D for WebGL 及常用方法
  • 应用程序服务器/事件驱动编程/CommonJS介绍
  • 第二十九章 目标检测中的测试模型评价指标(车道线感知)
  • OceanBase 如何通过日志观测冻结转储流程?