当前位置: 首页 > news >正文

SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解

SpringBoot 3.x 整合 Kafka 详解

🎯 Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具有高吞吐量、低延迟、可扩展性和容错性等特点。

核心概念

  • Producer: 生产者,发送消息到Kafka集群
  • Consumer: 消费者,从Kafka集群读取消息
  • Topic: 主题,消息的分类,类似于消息队列
  • Partition: 分区,Topic的物理分割,提高并行处理能力
  • Broker: 代理,Kafka集群中的服务器节点
  • Consumer Group: 消费者组,多个消费者组成的组,共同消费Topic
  • Offset: 偏移量,消息在分区中的位置标识

核心特性

  • 高吞吐量: 支持每秒数百万条消息
  • 低延迟: 毫秒级的消息传递延迟
  • 持久化: 消息持久化存储到磁盘
  • 分布式: 支持集群部署和水平扩展
  • 容错性: 支持数据复制和故障恢复

🚀 快速开始

1. 添加依赖

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 数据验证 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka测试依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. Kafka配置

spring:# Kafka配置kafka:# Kafka服务器地址bootstrap-servers: localhost:9092# 生产者配置producer:# 重试次数retries: 3# 批量发送大小batch-size: 16384# 缓冲区大小buffer-memory: 33554432# 键序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值序列化器value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 确认模式acks: all# 压缩类型compression-type: gzip# 发送超时时间properties:delivery.timeout.ms: 120000request.timeout.ms: 30000# 消费者配置consumer:# 消费者组IDgroup-id: demo-group# 自动提交偏移量enable-auto-commit: false# 自动提交间隔auto-commit-interval: 1000# 键反序列化器key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 从最早的消息开始消费auto-offset-reset: earliest# 每次拉取的最大记录数max-poll-records: 500# 拉取超时时间fetch-max-wait: 500# JSON反序列化配置properties:spring.json.trusted.packages: "com.example.demo.dto"spring.json.type.mapping: "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto"# 监听器配置listener:# 确认模式ack-mode: manual_immediate# 并发数concurrency: 3# 轮询超时时间poll-timeout: 3000# 错误处理器type: batch# 日志配置
logging:level:org.springframework.kafka: DEBUGorg.apache.kafka: DEBUG

🔧 Kafka配置类

package com.example.demo.config;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;/*** 生产者配置*/@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.RETRIES_CONFIG, 3);configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");return new DefaultKafkaProducerFactory<>(configProps);}/*** KafkaTemplate配置*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** 消费者配置*/@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);// JSON反序列化配置props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.TYPE_MAPPINGS, "userEvent:com.example.demo.dto.UserEventDto,orderEvent:com.example.demo.dto.OrderEventDto");return new DefaultKafkaConsumerFactory<>(props);}/*** 监听器容器工厂配置*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置并发数factory.setConcurrency(3);// 设置确认模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置错误处理器factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler());return factory;}/*** 用户事件消费者工厂*/@Beanpublic ConsumerFactory<String, UserEventDto> userEventConsumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.demo.dto");props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEventDto.class);return new DefaultKafkaConsumerFactory<>(props);}/*** 用户事件监听器容器工厂*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, UserEventDto> userEventKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, UserEventDto> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(userEventConsumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

📊 消息DTO类

1. 用户事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.time.LocalDateTime;public class UserEventDto {@NotBlank(message = "事件ID不能为空")private String eventId;@NotBlank(message = "事件类型不能为空")private String eventType; // CREATE, UPDATE, DELETE@NotNull(message = "用户ID不能为空")private Long userId;private String username;private String email;private String operation;private String operatorId;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;private Object data; // 额外数据// 构造函数public UserEventDto() {this.timestamp = LocalDateTime.now();}public UserEventDto(String eventId, String eventType, Long userId, String operation) {this();this.eventId = eventId;this.eventType = eventType;this.userId = userId;this.operation = operation;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getOperation() { return operation; }public void setOperation(String operation) { this.operation = operation; }public String getOperatorId() { return operatorId; }public void setOperatorId(String operatorId) { this.operatorId = operatorId; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getData() { return data; }public void setData(Object data) { this.data = data; }@Overridepublic String toString() {return "UserEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", userId=" + userId +", username='" + username + '\'' +", operation='" + operation + '\'' +", timestamp=" + timestamp +'}';}
}

2. 订单事件DTO

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;public class OrderEventDto {@NotBlank(message = "事件ID不能为空")private String eventId;@NotBlank(message = "事件类型不能为空")private String eventType; // CREATED, PAID, SHIPPED, DELIVERED, CANCELLED@NotNull(message = "订单ID不能为空")private Long orderId;@NotNull(message = "用户ID不能为空")private Long userId;private String orderNo;private BigDecimal totalAmount;private String status;private List<OrderItem> items;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 订单项public static class OrderItem {private Long productId;private String productName;private Integer quantity;private BigDecimal price;// 构造函数public OrderItem() {}public OrderItem(Long productId, String productName, Integer quantity, BigDecimal price) {this.productId = productId;this.productName = productName;this.quantity = quantity;this.price = price;}// Getter和Setter方法public Long getProductId() { return productId; }public void setProductId(Long productId) { this.productId = productId; }public String getProductName() { return productName; }public void setProductName(String productName) { this.productName = productName; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }public BigDecimal getPrice() { return price; }public void setPrice(BigDecimal price) { this.price = price; }}// 构造函数public OrderEventDto() {this.timestamp = LocalDateTime.now();}public OrderEventDto(String eventId, String eventType, Long orderId, Long userId) {this();this.eventId = eventId;this.eventType = eventType;this.orderId = orderId;this.userId = userId;}// Getter和Setter方法public String getEventId() { return eventId; }public void setEventId(String eventId) { this.eventId = eventId; }public String getEventType() { return eventType; }public void setEventType(String eventType) { this.eventType = eventType; }public Long getOrderId() { return orderId; }public void setOrderId(Long orderId) { this.orderId = orderId; }public Long getUserId() { return userId; }public void setUserId(Long userId) { this.userId = userId; }public String getOrderNo() { return orderNo; }public void setOrderNo(String orderNo) { this.orderNo = orderNo; }public BigDecimal getTotalAmount() { return totalAmount; }public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }public String getStatus() { return status; }public void setStatus(String status) { this.status = status; }public List<OrderItem> getItems() { return items; }public void setItems(List<OrderItem> items) { this.items = items; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "OrderEventDto{" +"eventId='" + eventId + '\'' +", eventType='" + eventType + '\'' +", orderId=" + orderId +", userId=" + userId +", orderNo='" + orderNo + '\'' +", totalAmount=" + totalAmount +", status='" + status + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生产者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.CompletableFuture;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// Topic名称常量public static final String USER_EVENTS_TOPIC = "user-events";public static final String ORDER_EVENTS_TOPIC = "order-events";public static final String NOTIFICATION_TOPIC = "notifications";/*** 发送用户事件*/public void sendUserEvent(UserEventDto userEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(USER_EVENTS_TOPIC, userEvent.getUserId().toString(), userEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("用户事件发送成功: " + userEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("用户事件发送失败: " + userEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送用户事件异常: " + e.getMessage());}}/*** 发送订单事件*/public void sendOrderEvent(OrderEventDto orderEvent) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(ORDER_EVENTS_TOPIC, orderEvent.getOrderId().toString(), orderEvent);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("订单事件发送成功: " + orderEvent.getEventId() + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("订单事件发送失败: " + orderEvent.getEventId() + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送订单事件异常: " + e.getMessage());}}/*** 发送通知消息*/public void sendNotification(String message) {try {String messageId = UUID.randomUUID().toString();CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(NOTIFICATION_TOPIC, messageId, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("通知消息发送成功: " + messageId + " with offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("通知消息发送失败: " + messageId + " " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送通知消息异常: " + e.getMessage());}}/*** 发送带分区的消息*/public void sendMessageToPartition(String topic, int partition, String key, Object message) {try {CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, partition, key, message);future.whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送到分区成功: partition=" + partition + " offset=[" + result.getRecordMetadata().offset() + "]");} else {System.err.println("消息发送到分区失败: " + ex.getMessage());}});} catch (Exception e) {System.err.println("发送分区消息异常: " + e.getMessage());}}/*** 批量发送消息*/public void sendBatchMessages(String topic, java.util.List<Object> messages) {messages.forEach(message -> {String key = UUID.randomUUID().toString();kafkaTemplate.send(topic, key, message);});// 刷新缓冲区,确保消息立即发送kafkaTemplate.flush();System.out.println("批量发送 " + messages.size() + " 条消息完成");}
}

📥 消息消费者

package com.example.demo.service;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class KafkaConsumerService {/*** 消费用户事件*/@KafkaListener(topics = "user-events", groupId = "user-event-group", containerFactory = "userEventKafkaListenerContainerFactory")public void consumeUserEvent(@Payload UserEventDto userEvent,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,@Header(KafkaHeaders.OFFSET) long offset,Acknowledgment acknowledgment) {try {System.out.println("接收到用户事件: " + userEvent);System.out.println("Topic: " + topic + ", Partition: " + partition + ", Offset: " + offset);// 处理用户事件的业务逻辑processUserEvent(userEvent);// 手动确认消息acknowledgment.acknowledge();System.out.println("用户事件处理完成: " + userEvent.getEventId());} catch (Exception e) {System.err.println("处理用户事件失败: " + e.getMessage());// 这里可以实现重试逻辑或将消息发送到死信队列}}/*** 消费订单事件*/@KafkaListener(topics = "order-events", groupId = "order-event-group")public void consumeOrderEvent(ConsumerRecord<String, OrderEventDto> record,Acknowledgment acknowledgment) {try {OrderEventDto orderEvent = record.value();System.out.println("接收到订单事件: " + orderEvent);System.out.println("Key: " + record.key() + ", Partition: " + record.partition() + ", Offset: " + record.offset());// 处理订单事件的业务逻辑processOrderEvent(orderEvent);// 手动确认消息acknowledgment.acknowledge();System.out.println("订单事件处理完成: " + orderEvent.getEventId());} catch (Exception e) {System.err.println("处理订单事件失败: " + e.getMessage());}}/*** 消费通知消息*/@KafkaListener(topics = "notifications", groupId = "notification-group")public void consumeNotification(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,Acknowledgment acknowledgment) {try {System.out.println("接收到通知消息: " + message);System.out.println("Message Key: " + key);// 处理通知消息的业务逻辑processNotification(message);// 手动确认消息acknowledgment.acknowledge();System.out.println("通知消息处理完成");} catch (Exception e) {System.err.println("处理通知消息失败: " + e.getMessage());}}/*** 批量消费消息*/@KafkaListener(topics = "batch-topic", groupId = "batch-group")public void consumeBatchMessages(List<ConsumerRecord<String, Object>> records,Acknowledgment acknowledgment) {try {System.out.println("接收到批量消息,数量: " + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println("处理消息: Key=" + record.key() + ", Value=" + record.value() + ", Partition=" + record.partition() + ", Offset=" + record.offset());// 处理单条消息processBatchMessage(record.value());}// 批量确认所有消息acknowledgment.acknowledge();System.out.println("批量消息处理完成");} catch (Exception e) {System.err.println("处理批量消息失败: " + e.getMessage());}}/*** 多Topic消费*/@KafkaListener(topics = {"user-events", "order-events"}, groupId = "multi-topic-group")public void consumeMultiTopicEvents(ConsumerRecord<String, Object> record,Acknowledgment acknowledgment) {try {String topic = record.topic();Object value = record.value();System.out.println("接收到多Topic消息: Topic=" + topic + ", Value=" + value);// 根据Topic类型处理不同的消息switch (topic) {case "user-events":if (value instanceof UserEventDto) {processUserEvent((UserEventDto) value);}break;case "order-events":if (value instanceof OrderEventDto) {processOrderEvent((OrderEventDto) value);}break;default:System.out.println("未知Topic: " + topic);}acknowledgment.acknowledge();} catch (Exception e) {System.err.println("处理多Topic消息失败: " + e.getMessage());}}// 业务处理方法private void processUserEvent(UserEventDto userEvent) {// 根据事件类型处理用户事件switch (userEvent.getEventType()) {case "CREATE":System.out.println("处理用户创建事件: " + userEvent.getUserId());// 发送欢迎邮件、初始化用户数据等break;case "UPDATE":System.out.println("处理用户更新事件: " + userEvent.getUserId());// 同步用户信息到其他系统break;case "DELETE":System.out.println("处理用户删除事件: " + userEvent.getUserId());// 清理用户相关数据break;default:System.out.println("未知用户事件类型: " + userEvent.getEventType());}}private void processOrderEvent(OrderEventDto orderEvent) {// 根据事件类型处理订单事件switch (orderEvent.getEventType()) {case "CREATED":System.out.println("处理订单创建事件: " + orderEvent.getOrderId());// 库存扣减、发送确认邮件等break;case "PAID":System.out.println("处理订单支付事件: " + orderEvent.getOrderId());// 更新订单状态、准备发货等break;case "SHIPPED":System.out.println("处理订单发货事件: " + orderEvent.getOrderId());// 发送物流信息、更新状态等break;case "DELIVERED":System.out.println("处理订单送达事件: " + orderEvent.getOrderId());// 确认收货、评价提醒等break;case "CANCELLED":System.out.println("处理订单取消事件: " + orderEvent.getOrderId());// 退款处理、库存回滚等break;default:System.out.println("未知订单事件类型: " + orderEvent.getEventType());}}private void processNotification(String message) {System.out.println("处理通知消息: " + message);// 发送邮件、短信、推送通知等}private void processBatchMessage(Object message) {System.out.println("处理批量消息项: " + message);// 批量处理逻辑}
}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.dto.OrderEventDto;
import com.example.demo.dto.UserEventDto;
import com.example.demo.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import jakarta.validation.Valid;
import java.math.BigDecimal;
import java.util.*;@RestController
@RequestMapping("/api/kafka")
@CrossOrigin(origins = "*")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;/*** 发送用户事件*/@PostMapping("/user-events")public ResponseEntity<Map<String, String>> sendUserEvent(@RequestBody @Valid UserEventDto userEvent) {kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用户事件发送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 发送订单事件*/@PostMapping("/order-events")public ResponseEntity<Map<String, String>> sendOrderEvent(@RequestBody @Valid OrderEventDto orderEvent) {kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "订单事件发送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 发送通知消息*/@PostMapping("/notifications")public ResponseEntity<Map<String, String>> sendNotification(@RequestBody Map<String, String> request) {String message = request.get("message");kafkaProducerService.sendNotification(message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "通知消息发送成功");return ResponseEntity.ok(response);}/*** 快速创建用户事件*/@PostMapping("/quick/user-event")public ResponseEntity<Map<String, String>> quickUserEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long userId = Long.valueOf(request.get("userId").toString());String username = (String) request.get("username");String email = (String) request.get("email");UserEventDto userEvent = new UserEventDto(UUID.randomUUID().toString(),eventType,userId,"API_OPERATION");userEvent.setUsername(username);userEvent.setEmail(email);userEvent.setOperatorId("system");kafkaProducerService.sendUserEvent(userEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "用户事件创建并发送成功");response.put("eventId", userEvent.getEventId());return ResponseEntity.ok(response);}/*** 快速创建订单事件*/@PostMapping("/quick/order-event")public ResponseEntity<Map<String, String>> quickOrderEvent(@RequestBody Map<String, Object> request) {String eventType = (String) request.get("eventType");Long orderId = Long.valueOf(request.get("orderId").toString());Long userId = Long.valueOf(request.get("userId").toString());String orderNo = (String) request.get("orderNo");BigDecimal totalAmount = new BigDecimal(request.get("totalAmount").toString());OrderEventDto orderEvent = new OrderEventDto(UUID.randomUUID().toString(),eventType,orderId,userId);orderEvent.setOrderNo(orderNo);orderEvent.setTotalAmount(totalAmount);orderEvent.setStatus(eventType.toLowerCase());kafkaProducerService.sendOrderEvent(orderEvent);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "订单事件创建并发送成功");response.put("eventId", orderEvent.getEventId());return ResponseEntity.ok(response);}/*** 批量发送消息*/@PostMapping("/batch")public ResponseEntity<Map<String, String>> sendBatchMessages(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");@SuppressWarnings("unchecked")List<String> messages = (List<String>) request.get("messages");List<Object> messageObjects = new ArrayList<>(messages);kafkaProducerService.sendBatchMessages(topic, messageObjects);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "批量消息发送成功");response.put("count", String.valueOf(messages.size()));return ResponseEntity.ok(response);}/*** 发送到指定分区*/@PostMapping("/partition")public ResponseEntity<Map<String, String>> sendToPartition(@RequestBody Map<String, Object> request) {String topic = (String) request.get("topic");Integer partition = (Integer) request.get("partition");String key = (String) request.get("key");Object message = request.get("message");kafkaProducerService.sendMessageToPartition(topic, partition, key, message);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "消息发送到指定分区成功");response.put("partition", partition.toString());return ResponseEntity.ok(response);}
}

📊 最佳实践

1. 消息设计

  • 设计合理的消息格式
  • 使用版本化的消息结构
  • 包含必要的元数据信息
  • 考虑消息的向后兼容性

2. 性能优化

  • 合理设置批量大小
  • 使用压缩减少网络传输
  • 优化序列化方式
  • 合理设置分区数量

3. 可靠性保证

  • 启用生产者确认机制
  • 实现消费者幂等性
  • 处理重复消息
  • 实现死信队列机制

4. 监控与运维

  • 监控消息积压情况
  • 跟踪消费者延迟
  • 监控集群健康状态
  • 实现告警机制

本文关键词: Kafka, 消息队列, 流处理, 分布式系统, 事件驱动, 微服务通信

http://www.lryc.cn/news/610644.html

相关文章:

  • NLP——BERT模型全面解析:从基础架构到优化演进
  • 家常菜点餐|基于java和小程序的家庭大厨家常菜点餐系统设计与实现(源码+数据库+文档)
  • 一次“无告警”的服务器宕机分析:从无迹可寻到精准定位
  • 一文掌握Bard机器翻译,以及用python调用的4种方式(现已升级为 Gemini)
  • vue3通过按钮实现横向滚动或鼠标滚动横坐标滚动
  • 用 Python 构建高质量的中文 Wikipedia 语料库:从原始 XML 到干净段落
  • 【taro react】 ---- useModel 数据双向绑定 hook 实现
  • 【乐企板式文件生成工程】关于乐企板式文件(PDF/OFD/XML)生成工程介绍
  • Taro Hooks 完整分类详解
  • wps创建编辑excel customHeight 属性不是标准 Excel Open XML导致比对异常
  • 云计算一阶段Ⅱ——11. Linux 防火墙管理
  • 《Node.js与 Elasticsearch的全文搜索架构解析》
  • Sentinel全面实战指南
  • 剑指offer第2版:字符串
  • Day34 GPU训练及类的call方法
  • Android audio之 AudioDeviceInventory
  • PCBA电子产品复制全攻略:从入门到精通
  • 【音视频】WebRTC 一对一通话-信令服
  • 强化学习_Paper_1991_Reinforcement learning is direct adaptive optimal control
  • 自然语言处理×第三卷:文本数据分析——她不再只是贴着你听,而开始学会分析你语言的结构
  • python+MySQL组合实现生成销售财务报告
  • 游戏画面总是卡顿怎么办 告别延迟畅玩游戏
  • 电脑搜索不到公司无线网络
  • 基于ARM+FPGA多通道超声信号采集与传输系统设计
  • NuGet03-私有仓库搭建
  • mac前端环境安装
  • 【ARM】CMSIS6 介绍
  • Mac上pnpm的安装与使用
  • AIDL学习
  • 《算法导论》第 2 章 - 算法基础