当前位置: 首页 > news >正文

Spring Boot + Spring Kafka 集成

Spring Boot 集成 org.springframework.kafka 的完整代码示例,包含 pom 依赖、配置类、生产者、消费者、测试入口,做到开箱即用。


1. pom.xml 依赖

<dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.2.4</version> <!-- 选用最新稳定版本 --></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></dependency>
</dependencies>

2. application.yml 配置

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:group-id: test-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest

3. Kafka 配置类(可选,定制用)

package com.example.kafkademo.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaTopicConfig {// 自动创建一个 topic(分区=3,副本=1)@Beanpublic NewTopic topic() {return new NewTopic("test-topic", 3, (short) 1);}
}

4. 生产者 Service

package com.example.kafkademo.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;private static final String TOPIC = "test-topic";public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// 发送消息public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message).thenAccept(result -> {System.out.printf("Sent message=[%s] with key=[%s] to partition=[%d] offset=[%d]%n",message, key,result.getRecordMetadata().partition(),result.getRecordMetadata().offset());});}
}

5. 消费者 Listener

package com.example.kafkademo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerListener {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(ConsumerRecord<String, String> record) {System.out.printf("Consumed message: key=%s value=%s partition=%d offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}
}

6. 控制器测试入口

package com.example.kafkademo.controller;import com.example.kafkademo.producer.KafkaProducerService;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}@GetMapping("/send")public String send(@RequestParam String key, @RequestParam String msg) {producerService.sendMessage(key, msg);return "Message sent: " + msg;}
}

7. 启动类

package com.example.kafkademo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}

8. 使用方式

  1. 启动 Kafka & Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    
  2. 启动 Spring Boot 应用

    mvn spring-boot:run
    
  3. 在浏览器/命令行发送消息

    curl "http://localhost:8080/kafka/send?key=1&msg=HelloKafka"
    
  4. 控制台会看到消费者打印出消费到的消息。


http://www.lryc.cn/news/624682.html

相关文章:

  • SMTPman,smtp ssl助力安全高效邮件传输!
  • Java 中表示数据集的常用集合类
  • 低端设备加载webp ANR
  • 安全存储之 SAES+HUK 使用技巧和常见问题 LAT1543
  • Rust 教程之简介000
  • CSS:水平垂直居中
  • 【银河麒麟桌面系统】配置匿名文件夹与用户认证共享服务
  • 2025年秋招Java后端面试场景题+八股文题目
  • AI 推荐系统云端部署实战:基于亚马逊云科技免费资源的工程实现
  • 从财务整合到患者管理:德国医疗集团 Asklepios完成 SAP S/4HANA 全链条升级路径
  • CAN总线的安全性
  • Linux小白加油站,第三周周考
  • 世界模型之自动驾驶
  • 想找出版社出书?这样选就对了!
  • 《P1195 口袋的天空》
  • OVS:ovn是如何支持组播的?
  • GPT-5之后:当大模型更新不再是唯一焦点
  • 多硬盘构建lvm存储
  • GPT-5博士级AI使用教程及国内平替方案
  • 基于SpringBoot+Uniapp的互联网订餐小程序(协同过滤算法、Echarts图形化分析)
  • “Let it Crash“:分布式系统设计的涅槃重生哲学
  • 【笔记】位错的定义和分类
  • 【2025CVPR-目标检测方向】学习稳健且硬件自适应的对象检测器,以应对边缘设备的延迟攻击
  • Image-to-Music API 接入文档(图片生成音乐)
  • 综合布线系统的网络分线箱计量-文字查找精准定位
  • 区块链技术原理(16)-以太坊节点与客户端
  • 从0-1使用Fastmcp开发一个MCP服务,并部署到阿里云百炼 -持续更新中
  • 深入理解浏览器渲染机制:重排(Reflow)与重绘(Repaint)
  • 深入剖析以太坊虚拟机(EVM):区块链世界的计算引擎
  • 【低空安全】低空安全简介