基于SpringBoot3集成Kafka集群
1. build.gradle依赖引入
implementation 'org.springframework.kafka:spring-kafka:3.2.0'
2. 新增kafka-log.yml文件
在resource/config下面新增kafka-log.yml,配置主题与消费者组
# Kafka消费者群组
kafka:consumer:group:log-data: log-data-grouptopic:log-data: log-data-topicauto-startup: false
加载自定义yml文件
@Configuration
public class YmlConfiguration {@Beanpublic PropertySourcesPlaceholderConfigurer properties() {PropertySourcesPlaceholderConfigurer configurer = new PropertySourcesPlaceholderConfigurer();YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();yaml.setResources(new ClassPathResource[]{new ClassPathResource("config/kafka-log.yml")});configurer.setProperties(yaml.getObject());return configurer;}
}
3. application.yml文件配置
spring:kafka:bootstrap-servers: 192.168.0.81:9092,192.168.0.82:9092,192.168.0.83:9092producer:retries: 0batch-size: 16384buffer-memory: 254288key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerauto-topic-creation:auto-create: trueproperties:linger.ms: 1session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: log-data-groupauto-offset-reset: latestproperties:session.timeout.ms: 15000sasl.mechanism: PLAINsecurity.protocol: SASL_PLAINTEXTsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="your-password";
# 按需在不同环境配置值,如开发环境默认不启动
kafka:consumer:auto-startup: false
4. 生产者实现
@Service
@Slf4j
public class KafkaProducer {private final KafkaTemplate<Integer, String> kafkaTemplate;public KafkaProducer(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String data) {CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.whenComplete((sendResult, ex) -> {if (ex != null) {log.error("Kafka send message error = {}, topic = {}, data = {}", ex.getMessage(), topic, data);} else {// Handle the successful sendSystem.out.println("Message sent successfully: " + sendResult);}});}
}
5. 消费者实现
@Component
public class KafkaConsumerGroupManager {private KafkaAdmin kafkaAdmin;private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;public KafkaConsumerGroupManager(KafkaAdmin kafkaAdmin, ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {this.kafkaAdmin = kafkaAdmin;this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;}public void ensureConsumerGroupExists(String groupId) {try {// 获取 AdminClientAdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());// 检查消费者组是否存在ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();Collection<org.apache.kafka.clients.admin.ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();boolean groupExists = consumerGroupListings.stream().anyMatch(consumerGroupListing -> consumerGroupListing.groupId().equals(groupId));if (!groupExists) {// 如果不存在,则创建消费者组kafkaListenerContainerFactory.getContainerProperties().setGroupId(groupId);}} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Failed to check consumer group existence", e);}}
}
@Service
@Slf4j
public class KafkaConsumer {private ElasticsearchOperations elasticsearchOperations206;public KafkaConsumer(ElasticsearchOperations elasticsearchOperations206) {this.elasticsearchOperations206 = elasticsearchOperations206;}/*** 日志数据消费** @param message*/@KafkaListener(topics = {"${kafka.consumer.topic.log-data}"}, groupId = "${kafka.consumer.group.log-data}", autoStartup = "${kafka.consumer.auto-startup}")public void consumer(String message) {this.bulkIndexJsonData(message);}public void bulkIndexJsonData(String jsonData) {List<IndexQuery> queries = new ArrayList<>();IndexQuery query = new IndexQuery();query.setSource(jsonData);query.setOpType(IndexQuery.OpType.INDEX);queries.add(query);elasticsearchOperations206.bulkIndex(queries, IndexCoordinates.of("log"));}
}
OK, 至此完毕。在某次集群宕机后,我们发现日志无法查询,经排查,是因为最初配置了auto-offset-reset: earliest
导致从头开始重新消费,幸好ES做了幂等性处理