Spring Boot + Spring Kafka 集成
Spring Boot 集成 org.springframework.kafka
的完整代码示例,包含 pom 依赖、配置类、生产者、消费者、测试入口,做到开箱即用。
1. pom.xml
依赖
<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.2.4</version> <!-- 选用最新稳定版本 --></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></dependency>
</dependencies>
2. application.yml
配置
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:group-id: test-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest
3. Kafka 配置类(可选,定制用)
package com.example.kafkademo.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaTopicConfig {// 自动创建一个 topic(分区=3,副本=1)@Beanpublic NewTopic topic() {return new NewTopic("test-topic", 3, (short) 1);}
}
4. 生产者 Service
package com.example.kafkademo.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;private static final String TOPIC = "test-topic";public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// 发送消息public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message).thenAccept(result -> {System.out.printf("Sent message=[%s] with key=[%s] to partition=[%d] offset=[%d]%n",message, key,result.getRecordMetadata().partition(),result.getRecordMetadata().offset());});}
}
5. 消费者 Listener
package com.example.kafkademo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerListener {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(ConsumerRecord<String, String> record) {System.out.printf("Consumed message: key=%s value=%s partition=%d offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}
}
6. 控制器测试入口
package com.example.kafkademo.controller;import com.example.kafkademo.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}@GetMapping("/send")public String send(@RequestParam String key, @RequestParam String msg) {producerService.sendMessage(key, msg);return "Message sent: " + msg;}
}
7. 启动类
package com.example.kafkademo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}
8. 使用方式
-
启动 Kafka & Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
-
启动 Spring Boot 应用
mvn spring-boot:run
-
在浏览器/命令行发送消息
curl "http://localhost:8080/kafka/send?key=1&msg=HelloKafka"
-
控制台会看到消费者打印出消费到的消息。