本地事务 + 消息队列事务方案设计
Spring Boot 和 RocketMQ
在Spring Boot项目中实现“本地事务 + 消息队列事务”的方案,可以按照以下步骤实现:
- 先执行MySQL本地事务操作(未提交)
- 随后发送消息到消息队列(如RocketMQ事务消息)
- 等待消息队列确认消息投递成功
- 提交MySQL事务
以下是基于Spring Boot和RocketMQ的完整代码示例:
确保pom.xml中包含RocketMQ的依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency>
业务场景:订单创建和库存更新
需求:创建订单时,使用本地事务处理订单操作,并发送事务消息给库存服务,通知更新库存。
1. 订单服务:OrderService
@Slf4j
@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderRepository orderRepository;/*** 创建订单并发送事务消息*/@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 先执行本地事务(保存订单)log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 构造事务消息Message<OrderMessage> message = MessageBuilder.withPayload(new OrderMessage(orderId, productId, quantity)).build();// Step 3: 发送事务消息rocketMQTemplate.sendMessageInTransaction("order-topic", // 消息主题message,null // 额外参数);}
}
2. 事务监听器:OrderTransactionListener
事务监听器中,包含本地事务执行逻辑和事务状态回查逻辑。
@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener {@Autowiredprivate OrderRepository orderRepository;/*** 执行本地事务逻辑*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {OrderMessage orderMessage = (OrderMessage) SerializationUtils.deserialize(msg.getBody());try {log.info("执行本地事务 - 更新订单状态: {}", orderMessage.getOrderId());// 本地事务:更新订单状态为“CONFIRMED”orderRepository.updateStatus(orderMessage.getOrderId(), "CONFIRMED");return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {log.error("本地事务执行失败: {}", e.getMessage());return LocalTransactionState.ROLLBACK_MESSAGE;}}/*** 回查本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {OrderMessage orderMessage = (OrderMessage) SerializationUtils.deserialize(msg.getBody());String orderId = orderMessage.getOrderId();log.info("回查本地事务状态 - 订单ID: {}", orderId);String status = orderRepository.findStatusByOrderId(orderId);if ("CONFIRMED".equals(status)) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.ROLLBACK_MESSAGE;}}
}
3. RocketMQ配置
将事务监听器和生产者绑定。
@Configuration
public class RocketMQConfig {@Beanpublic TransactionMQProducer transactionMQProducer(OrderTransactionListener listener) {TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");producer.setNamesrvAddr("127.0.0.1:9876");producer.setTransactionListener(listener);return producer;}
}
4. 消息对象:OrderMessage
用于传递订单信息的消息对象。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage implements Serializable {private String orderId;private String productId;private int quantity;
}
5. 数据库操作:OrderRepository
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {@Modifying@Transactional@Query("UPDATE Order o SET o.status = :status WHERE o.orderId = :orderId")void updateStatus(@Param("orderId") String orderId, @Param("status") String status);@Query("SELECT o.status FROM Order o WHERE o.orderId = :orderId")String findStatusByOrderId(@Param("orderId") String orderId);
}
运行流程
- 客户端调用createOrderAndSendMessage方法:
- 先在MySQL数据库中插入订单数据。
- 发送“半消息”到RocketMQ。
- RocketMQ事务监听器executeLocalTransaction执行:
- 更新订单状态,表示本地事务已完成。
- RocketMQ提交或回滚事务消息:
- 若本地事务成功,则消息被消费者消费。
- 若本地事务失败,消息被回滚,不可消费。
- RocketMQ自动触发回查逻辑(若消息超时未确认):
- 查询订单状态,判断事务状态。
优点
- 保证强一致性:通过事务消息,确保MySQL和消息队列状态一致。
- 容灾能力:通过回查机制避免网络异常或服务故障导致消息丢失。
- 解耦性:消息队列将订单服务和库存服务解耦。
注意事项
- 幂等性处理:消费者侧必须支持幂等逻辑,避免重复消费。
- 回查性能优化:本地事务状态应快速可查,如可使用缓存或事务日志表。
- 事务超时:根据业务需求设置合理的事务超时参数,避免长时间占用资源。
Spring Boot 和 RabbitMQ
使用 RabbitMQ 也可以实现“本地事务 + 消息队列事务”的一致性方案,但 RabbitMQ 本身不支持事务消息(不像 RocketMQ)。因此,可以通过以下方式实现类似的机制:
核心思路
- 先执行 MySQL 的本地事务(未提交)。
- 发送消息到 RabbitMQ,但消息暂存(不被消费者消费)。
- 本地事务提交后,确认 RabbitMQ 消息投递(通过 RabbitMQ 的 ConfirmCallback 和手动 ACK)。
- 如果 MySQL 事务失败,则丢弃消息或不确认消息投递。
依赖配置
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
使用 RabbitMQ 的 Confirm 模式,确保消息投递到交换机和队列的可靠性。
@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 开启消息投递到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息成功到达交换机log.info("消息成功投递到交换机: {}", correlationData);} else {// 消息投递到交换机失败log.error("消息投递到交换机失败: {}, 原因: {}", correlationData, cause);}});// 开启消息投递到队列的回退回调rabbitTemplate.setReturnsCallback(returned -> {log.error("消息未成功投递到队列: {}", returned.getMessage());});return rabbitTemplate;}@Beanpublic Queue orderQueue() {return new Queue("order-queue", true);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order-exchange", true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routing.key");}
}
1. 订单服务:OrderService
@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 本地事务 - 保存订单到数据库log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 发送 RabbitMQ 消息String messageContent = String.format("订单ID: %s, 产品ID: %s, 数量: %d", orderId, productId, quantity);Message message = MessageBuilder.withBody(messageContent.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(orderId).build();try {rabbitTemplate.convertAndSend("order-exchange", "order.routing.key", message);log.info("事务消息发送成功: {}", messageContent);} catch (Exception e) {log.error("消息发送失败: {}", e.getMessage());throw new RuntimeException("消息发送失败,事务回滚");}}
}
消费者:OrderConsumer
@Slf4j
@Component
public class OrderConsumer {@Autowiredprivate StockService stockService;@RabbitListener(queues = "order-queue")public void handleMessage(String message) {log.info("收到订单消息: {}", message);// Step 1: 解析消息内容String[] parts = message.split(",");String orderId = parts[0].split(":")[1].trim();String productId = parts[1].split(":")[1].trim();int quantity = Integer.parseInt(parts[2].split(":")[1].trim());// Step 2: 执行库存更新逻辑stockService.updateStock(productId, quantity);log.info("库存更新成功,订单ID: {}", orderId);}
}
库存服务:StockService
@Service
public class StockService {@Autowiredprivate RedisTemplate<String, Integer> redisTemplate;public void updateStock(String productId, int quantity) {String redisStockKey = "product_stock_" + productId;Integer stock = redisTemplate.opsForValue().get(redisStockKey);if (stock == null || stock < quantity) {throw new RuntimeException("库存不足");}redisTemplate.opsForValue().set(redisStockKey, stock - quantity);log.info("库存更新成功,产品ID: {}, 剩余库存: {}", productId, stock - quantity);}
}
数据库模型
@Entity
@Data
public class Order {@Idprivate String orderId;private String productId;private int quantity;private String status; // PENDING, CONFIRMED
}
关键点解析
- 事务控制
- Spring 的 @Transactional 确保本地数据库操作是事务性的。
- 如果 RabbitMQ 消息发送失败,直接抛出异常回滚数据库事务。
- 消息可靠性
- 开启 RabbitMQ 的 Confirm 模式,确保消息成功到达交换机和队列。
- 消息发送失败时,本地事务回滚,确保 MySQL 和 RabbitMQ 的数据一致性。
- 消费者幂等性
- 需要确保消息消费的幂等性(如使用 Redis 或数据库记录已消费消息的ID)。
虽然 RabbitMQ 不原生支持事务消息,但通过这种“本地事务 + 消息确认机制”的组合,仍可以保证 MySQL 和 RabbitMQ 的一致性。相比 RocketMQ 的事务消息,RabbitMQ 的实现稍复杂,但性能更高,适合对消息投递延迟要求较高的场景。
Spring Boot 和 Kafka
使用 Kafka 同样可以实现“本地事务 + 消息队列事务”的一致性方案,得益于 Kafka 的 事务功能(Kafka Transactions)。Kafka 的事务支持允许将生产消息和消费消息的处理绑定在一个事务中,这样可以保证消息的原子性和一致性。
核心思路
- 使用 Kafka 的事务性生产者(TransactionalProducer),确保消息生产的事务性。
- 在本地事务中完成 MySQL 数据库操作,同时向 Kafka 提交消息。
- 如果事务失败,回滚本地事务,同时 Kafka 消息不会被提交。
Maven 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
Kafka 配置
@Configuration
@EnableTransactionManagement
public class KafkaConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transactional-id"); // 配置事务IDreturn new DefaultKafkaProducerFactory<>(props);}@Beanpublic KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}
订单服务实现
@Service
public class OrderService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic void createOrderAndSendMessage(String orderId, String productId, int quantity) {// Step 1: 本地事务 - 保存订单到数据库log.info("开始创建订单...");Order order = new Order();order.setOrderId(orderId);order.setProductId(productId);order.setQuantity(quantity);order.setStatus("PENDING");orderRepository.save(order);// Step 2: 向 Kafka 发送事务性消息try {kafkaTemplate.executeInTransaction(operations -> {String message = String.format("订单ID: %s, 产品ID: %s, 数量: %d", orderId, productId, quantity);operations.send("order-topic", orderId, message);log.info("事务消息已发送: {}", message);return true;});} catch (Exception e) {log.error("消息发送失败,事务回滚: {}", e.getMessage());throw new RuntimeException("消息发送失败,事务回滚");}}
}
消费者服务实现
消费者服务需要保证幂等性,避免重复消费消息对业务数据造成影响。以下是一个简单的消费者实现示例。
@Slf4j
@Component
public class OrderConsumer {@Autowiredprivate StockService stockService;@KafkaListener(topics = "order-topic", groupId = "order-consumer-group")public void handleOrderMessage(ConsumerRecord<String, String> record) {log.info("收到订单消息: {}", record.value());// Step 1: 解析消息内容String[] parts = record.value().split(",");String orderId = parts[0].split(":")[1].trim();String productId = parts[1].split(":")[1].trim();int quantity = Integer.parseInt(parts[2].split(":")[1].trim());// Step 2: 执行库存更新逻辑stockService.updateStock(productId, quantity);log.info("库存更新成功,订单ID: {}", orderId);}
}
库存服务实现
@Service
public class StockService {@Autowiredprivate RedisTemplate<String, Integer> redisTemplate;public void updateStock(String productId, int quantity) {String redisStockKey = "product_stock_" + productId;Integer stock = redisTemplate.opsForValue().get(redisStockKey);if (stock == null || stock < quantity) {throw new RuntimeException("库存不足");}redisTemplate.opsForValue().set(redisStockKey, stock - quantity);log.info("库存更新成功,产品ID: {}, 剩余库存: {}", productId, stock - quantity);}
}
数据库模型
@Entity
@Data
public class Order {@Idprivate String orderId;private String productId;private int quantity;private String status; // PENDING, CONFIRMED
}
关键点解析
- Kafka 的事务支持
- Kafka 支持事务性生产者,kafkaTemplate.executeInTransaction 确保消息生产和本地事务绑定在一起,保证了最终一致性。
- 幂等性消费
- 消费者需要设计幂等性逻辑,比如通过 Redis 或数据库记录已消费的消息 ID,避免重复消费导致库存多次扣减。
- 性能和可靠性
- Kafka 的事务性能优于 RabbitMQ,但需要注意事务超时和重复消费的情况。
Kafka 的事务支持可以较为优雅地实现本地事务和消息队列事务的统一,与 RabbitMQ 相比,Kafka 的事务机制更适合处理分布式一致性问题,尤其是在高吞吐量场景中表现更加出色。