当前位置: 首页 > news >正文

springboot集成kafka消费数据

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}
http://www.lryc.cn/news/280760.html

相关文章:

  • 单例模式---JAVA
  • maven管理使用
  • 如何在一个系统中同时访问异构的多种数据库
  • 半监督学习 - 半监督聚类(Semi-Supervised Clustering)
  • 实现STM32烧写程序-(3) Hex文件结构
  • 精品量化公式——“区域突破”,应对当下行情较好的主图看盘策略
  • 自然语言处理5——发掘隐藏规律 - Python中的关联规则挖掘
  • 【记录】重装系统后的软件安装
  • Android 13 - Media框架(31)- ACodec(七)
  • 快速了解VR全景拍摄技术运用在旅游景区的优势
  • 分布形态的度量_峰度系数的探讨
  • HCIP 重发布
  • FX图中的节点代表什么操作
  • 【Java 设计模式】创建型之单例模式
  • FlinkAPI开发之窗口(Window)
  • 【Unity】Joystick Pack摇杆插件实现锁四向操作
  • 29 旋转工具箱
  • WeNet2.0:提高端到端ASR的生产力
  • 第九部分 使用函数 (四)
  • 一文读懂「Prompt Engineering」提示词工程
  • 微信小程序(一)简单的结构及样式演示
  • 【设计模式】外观模式
  • 优先级队列(Priority Queue)
  • 12-桥接模式(Bridge)
  • Zookeeper+Kafka概述
  • 架构师 - 架构师是做什么的 - 学习总结
  • 全链路压测方案(一)—方案调研
  • c++关键字const
  • 分布式计算平台 Hadoop 简介
  • 系统学习Python——警告信息的控制模块warnings:常见函数-[warnings.warn]