当前位置: 首页 > news >正文

基于SpringBoot3集成Kafka集群

1. build.gradle依赖引入

implementation 'org.springframework.kafka:spring-kafka:3.2.0'

2. 新增kafka-log.yml文件

在resource/config下面新增kafka-log.yml,配置主题与消费者组

# Kafka消费者群组
kafka:consumer:group:log-data: log-data-grouptopic:log-data: log-data-topicauto-startup: false

加载自定义yml文件

@Configuration
public class YmlConfiguration {@Beanpublic PropertySourcesPlaceholderConfigurer properties() {PropertySourcesPlaceholderConfigurer configurer = new PropertySourcesPlaceholderConfigurer();YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();yaml.setResources(new ClassPathResource[]{new ClassPathResource("config/kafka-log.yml")});configurer.setProperties(yaml.getObject());return configurer;}
}

3. application.yml文件配置

spring:kafka:bootstrap-servers: 192.168.0.81:9092,192.168.0.82:9092,192.168.0.83:9092producer:retries: 0batch-size: 16384buffer-memory: 254288key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerauto-topic-creation:auto-create: trueproperties:linger.ms: 1session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: log-data-groupauto-offset-reset: latestproperties:session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"  password="your-password";
# 按需在不同环境配置值,如开发环境默认不启动
kafka:consumer:auto-startup: false

4. 生产者实现

@Service
@Slf4j
public class KafkaProducer {private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String data) {CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.whenComplete((sendResult, ex) -> {if (ex != null) {log.error("Kafka send message error = {}, topic = {}, data = {}", ex.getMessage(), topic, data);} else {// Handle the successful sendSystem.out.println("Message sent successfully: " + sendResult);}});}
}

5. 消费者实现

@Component
public class KafkaConsumerGroupManager {private KafkaAdmin kafkaAdmin;private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;public KafkaConsumerGroupManager(KafkaAdmin kafkaAdmin, ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {this.kafkaAdmin = kafkaAdmin;this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;}public void ensureConsumerGroupExists(String groupId) {try {// 获取 AdminClientAdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());// 检查消费者组是否存在ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();Collection<org.apache.kafka.clients.admin.ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();boolean groupExists = consumerGroupListings.stream().anyMatch(consumerGroupListing -> consumerGroupListing.groupId().equals(groupId));if (!groupExists) {// 如果不存在,则创建消费者组kafkaListenerContainerFactory.getContainerProperties().setGroupId(groupId);}} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Failed to check consumer group existence", e);}}
}
@Service
@Slf4j
public class KafkaConsumer {private ElasticsearchOperations elasticsearchOperations206;public KafkaConsumer(ElasticsearchOperations elasticsearchOperations206) {this.elasticsearchOperations206 = elasticsearchOperations206;}/*** 日志数据消费** @param message*/@KafkaListener(topics = {"${kafka.consumer.topic.log-data}"}, groupId = "${kafka.consumer.group.log-data}", autoStartup = "${kafka.consumer.auto-startup}")public void consumer(String message) {this.bulkIndexJsonData(message);}public void bulkIndexJsonData(String jsonData) {List<IndexQuery> queries = new ArrayList<>();IndexQuery query = new IndexQuery();query.setSource(jsonData);query.setOpType(IndexQuery.OpType.INDEX);queries.add(query);elasticsearchOperations206.bulkIndex(queries, IndexCoordinates.of("log"));}
}

OK, 至此完毕。在某次集群宕机后,我们发现日志无法查询,经排查,是因为最初配置了auto-offset-reset: earliest 导致从头开始重新消费,幸好ES做了幂等性处理

http://www.lryc.cn/news/586695.html

相关文章:

  • CentOS 7 升级系统内核级库 glibc 2.40 完整教程
  • docker运行redis指定配置+jdk17安装在centos7
  • C#单例模式管理全局变量
  • 【Linux 学习指南】网络基础概念(一):从协议到分层,看透计算机通信的底层逻辑
  • 【源力觉醒 创作者计划】文心开源大模型ERNIE-4.5私有化部署保姆级教程与多功能界面窗口部署
  • 文心一言大模型4.5系列开源测评
  • 开源链动2+1模式、AI智能名片与S2B2C商城小程序在私域运营中的协同创新研究
  • 笔记-极客-DDD实战-基于DDD的微服务拆分与设计
  • mysql复合条件匹配的查询优化
  • jeepay开源项目开发中金支付如何像其他支付渠道对接那样简单集成,集成服务商模式,极简集成工具。
  • (dp、贪心)洛谷 P8179 Tyres 题解
  • 012_PDF处理与文档分析
  • hash表的模拟--开放定址法
  • AI 助力:如何批量提取 Word 表格字段并导出至 Excel
  • 学习C++、QT---23(QT中QFileDialog库实现文件选择框打开、保存讲解)
  • 行测速算之假设分配法
  • 在 JetBrains 系列 IDE(如 IntelliJ IDEA、PyCharm 等)中如何新建一个 PlantUML 文件
  • Java集合框架深度解析:LinkedList vs ArrayList 的对决
  • 【Linux | 网络】应用层(HTTP)
  • Linux|服务器|二进制部署nacos(不是集群,单实例)(2025了,不允许还有人不会部署nacos)
  • 【PTA数据结构 | C语言版】简单计算器
  • 【Linux】线程机制深度实践:创建、等待、互斥与同步
  • 详解Linux下多进程与多线程通信(二)
  • ARC 02 runner scale set chart:对接集群与 Github Action 服务器
  • linux上的软挂载操作方法
  • DAY02:【ML 第一弹】KNN算法
  • 分类问题-机器学习
  • 掌握系统设计的精髓:12个核心设计模式的通俗解读
  • NW756NW815美光固态闪存NW821NW828
  • 设计模式深度解析:单例、工厂、适配器与代理模式