springboot整合kafka
springboot整合kafka
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.lhz</groupId><artifactId>kafka</artifactId><version>0.0.1</version><name>kafka</name><description>kafka</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>21</java.version><jdk.version>21</jdk.version><maven.compiler.source>${jdk.version}</maven.compiler.source><maven.compiler.target>${jdk.version}</maven.compiler.target><maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion><maven.compiler.encoding>utf-8</maven.compiler.encoding><project.build.sourceEncoding>utf-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.test.failure.ignore>true</maven.test.failure.ignore><maven.test.skip>true</maven.test.skip></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.5.0</version></dependency></dependencies><build><finalName>${project.name}</finalName><plugins><!-- 编译级别 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><!-- 设置编译字符编码 --><encoding>UTF-8</encoding><!-- 设置编译jdk版本 --><source>${jdk.version}</source><target>${jdk.version}</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>
application.yml配置kafka
本案例使用最少的配置,在
application.yml
中添加Kafka的连接配置:
spring:application:name: springboot-kafkakafka:bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092consumer:group-id: lihaozheauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:swagger-ui:path: /swagger-uitags-sorter: alphaoperations-sorter: alphaapi-docs:path: /v3/api-docsgroup-configs:- group: 'default'paths-to-match: '/**'packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:enable: truesetting:language: zh_cnbasic:# 启用基本认证enable: true# 设置用户名username: admin# 设置密码password: lihaozhe
Kafka配置类
创建
KafkaConfig.java
配置类,用于创建Kafka主题:
package cn.lhz.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@EnableKafka
public class KafkaConfig {
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// Map<String, Object> configProps = new HashMap<>();
// // 配置属性
// return new DefaultKafkaProducerFactory<>(configProps);
// }
//
// @Bean
// public KafkaTemplate<String, String> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }// @Bean
// public NewTopic myTopic() {
// return new NewTopic("lihaozhe", 1, (short) 1);
// }
}
Kafka消费者
创建
KafkaConsumer.java
消费者类,用于从Kafka主题接收消息:以下两段代码二选一
- 第一个案例代码只获取
value
值- 第二个案例代码获取了
ConsumerRecord
完整信息
获取value
package cn.lhz.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 value** @param value topic 中获取 value*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(String value) {System.out.printf("Consumed: %s\n", value);}
}
获取ConsumerRecord
package cn.lhz.service;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
public class KafkaConsumer {/*** 从 topic 中获取 ConsumerRecord** @param record topic 中获取 ConsumerRecord*/@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")public void consume(ConsumerRecord<String, String> record) {System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());}
}
Kafka生产者
package cn.lhz.service;import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** @author 李昊哲* @version 1.0.0*/
@Service
@RequiredArgsConstructor
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;/*** 或者 topic 中的 value** @param topic kafka topic* @param value kafka topic 中的 value*/public void sendMessage(String topic, String value) {kafkaTemplate.send(topic, value);}/*** 或者 topic 中的 value** @param topic kafka topic* @param key kafka topic 中的 key* @param value kafka topic 中的 value*/public void sendMessage(String topic, String key, String value) {kafkaTemplate.send(topic, key, value);}
}
创建控制器发送消息
package cn.lhz.controller;import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;/*** @author 李昊哲* @version 1.0.0*/
@RestController
@RequiredArgsConstructor
public class KafkaController {private final KafkaProducer kafkaProducer;@GetMapping("/send/{message}")public String sendMessage(@PathVariable(value = "message") String message) {// 只发送 valuekafkaProducer.sendMessage("lihaozhe", message);return "消息:" + message;}@GetMapping("/send/{key}/{message}")public String sendMessage(@PathVariable(value = "key") String key,@PathVariable(value = "message") String message) {// 只发送 key 和 valuekafkaProducer.sendMessage("lihaozhe", key, message);return "消息:" + message;}
}
配置OpenApi
package cn.lhz.config;import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** @author 李昊哲* @version 1.0.0*/
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {@Beanpublic OpenAPI springOpenAPI() {Contact contact = new Contact();contact.setName("李昊哲");contact.setUrl("https://space.bilibili.com/480308139");contact.setEmail("646269678@qq.com");// 访问路径:http://localhost:8080/doc.html// 访问路径:http://localhost:8080/swagger-ui/index.htmlreturn new OpenAPI().info(new Info().title("SpringBoot Kafka API").description("SpringBoot Kafka Simple Application").contact(contact).version("1.0.0"));}@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {try {//获取IPString hostAddress = Inet4Address.getLocalHost().getHostAddress();//获取端口号int port = event.getWebServer().getPort();//获取应用名String applicationName = event.getApplicationContext().getApplicationName();// TODO:这个localhost改成host地址log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);} catch (UnknownHostException e) {e.printStackTrace();}}
}
启动项目测试
控制台输出
Consumed: hello world
控制台输出
Consumed: lihaozhe-0-hello:world