当前位置: 首页 > news >正文

高度细化的SAGA模式实现:基于Spring Boot与RabbitMQ的跨服务事务

场景与技术栈

场景:电商系统中的订单创建流程,涉及订单服务(Order Service)、库存服务(Inventory Service)、支付服务(Payment Service)。

技术栈:

  • Java 11

  • Spring Boot 2.7.0

  • MySQL 8.0

  • RabbitMQ

项目结构与组件
  • order-service

  • inventory-service

  • payment-service

  • saga-coordinator

代码实现

1. Saga Coordinator

@SpringBootApplicationpublic class SagaCoordinatorApplication {    public static void main(String[] args) {        SpringApplication.run(SagaCoordinatorApplication.class, args);    }}@Service@RequiredArgsConstructorpublic class SagaService {    private final SagaOrderService sagaOrderService;    private final SagaInventoryService sagaInventoryService;    private final SagaPaymentService sagaPaymentService;    private final SagaTransactionRepository sagaTransactionRepository;    public void placeOrder(Long orderId, Long productId, int quantity, double amount) throws ExecutionException, InterruptedException {        SagaTransaction transaction = new SagaTransaction();        transaction.setOrderId(orderId);        transaction.setProductId(productId);        transaction.setQuantity(quantity);        transaction.setAmount(amount);        transaction.setStatus(SagaTransaction.Status.INITIATED);        sagaTransactionRepository.save(transaction);        CompletableFuture.runAsync(() -> sagaOrderService.createOrder(transaction))            .thenRunAsync(() -> sagaInventoryService.decreaseInventory(transaction))            .thenRunAsync(() -> sagaPaymentService.charge(transaction))            .exceptionally(ex -> {                rollback(transaction);                throw new RuntimeException("Saga failed", ex);            });    }    private void rollback(SagaTransaction transaction) {        CompletableFuture.runAsync(() -> sagaPaymentService.refund(transaction))            .thenRunAsync(() -> sagaInventoryService.increaseInventory(transaction))            .thenRunAsync(() -> sagaOrderService.cancelOrder(transaction))            .exceptionally(ex -> {                transaction.setStatus(SagaTransaction.Status.FAILED);                sagaTransactionRepository.save(transaction);                throw new RuntimeException("Rollback failed", ex);            });    }}

2. SagaOrderService

@Servicepublic class SagaOrderService {    private final OrderService orderService;    private final RabbitTemplate rabbitTemplate;    @Autowired    public SagaOrderService(OrderService orderService, RabbitTemplate rabbitTemplate) {        this.orderService = orderService;        this.rabbitTemplate = rabbitTemplate;    }    public void createOrder(SagaTransaction transaction) {        // 创建订单逻辑        // ...        // 发送创建订单完成的消息        rabbitTemplate.convertAndSend("order.create", transaction);    }    public void cancelOrder(SagaTransaction transaction) {        // 取消订单逻辑        // ...        // 发送取消订单完成的消息        rabbitTemplate.convertAndSend("order.cancel", transaction);    }}

3. SagaInventoryService

@Servicepublic class SagaInventoryService {    private final InventoryService inventoryService;    private final RabbitTemplate rabbitTemplate;    @Autowired    public SagaInventoryService(InventoryService inventoryService, RabbitTemplate rabbitTemplate) {        this.inventoryService = inventoryService;        this.rabbitTemplate = rabbitTemplate;    }    public void decreaseInventory(SagaTransaction transaction) {        // 扣减库存逻辑        // ...        // 发送扣减库存完成的消息        rabbitTemplate.convertAndSend("inventory.decrease", transaction);    }    public void increaseInventory(SagaTransaction transaction) {        // 增加库存逻辑        // ...        // 发送增加库存完成的消息        rabbitTemplate.convertAndSend("inventory.increase", transaction);    }}

4. SagaPaymentService

@Servicepublic class SagaPaymentService {    private final PaymentService paymentService;    private final RabbitTemplate rabbitTemplate;    @Autowired    public SagaPaymentService(PaymentService paymentService, RabbitTemplate rabbitTemplate) {        this.paymentService = paymentService;        this.rabbitTemplate = rabbitTemplate;    }    public void charge(SagaTransaction transaction) {        // 收款逻辑        // ...        // 发送收款完成的消息        rabbitTemplate.convertAndSend("payment.charge", transaction);    }    public void refund(SagaTransaction transaction) {        // 退款逻辑        // ...        // 发送退款完成的消息        rabbitTemplate.convertAndSend("payment.refund", transaction);    }}
SagaTransaction Entity
@Entitypublic class SagaTransaction {    @Id    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    private Long orderId;    private Long productId;    private int quantity;    private double amount;    private Status status; // INITIATED, COMPLETED, ROLLING_BACK, ROLLED_BACK, FAILED    // Getters & Setters}
消息队列监听

在每个微服务中,需要添加消息队列的监听器,以便在接收到消息时执行相应的操作。例如,在order-service中:

@Componentpublic class OrderSagaListener {    private final SagaOrderService sagaOrderService;    @Autowired    public OrderSagaListener(SagaOrderService sagaOrderService) {        this.sagaOrderService = sagaOrderService;    }    @RabbitListener(queues = "order.create")    public void handleOrderCreate(SagaTransaction transaction) {        sagaOrderService.createOrder(transaction);    }    @RabbitListener(queues = "order.cancel")    public void handleOrderCancel(SagaTransaction transaction) {        sagaOrderService.cancelOrder(transaction);    }}
额外细节
  • 为确保事务的一致性,可以使用RabbitMQ的发布确认(Publisher Confirms)机制。

  • 每个微服务的数据库事务应该使用@Transactional注解来保证ACID属性。

  • 需要设计失败重试和事务状态检查机制,确保在故障恢复时能够正确地执行补偿操作。

通过上述设计,SAGA模式与RabbitMQ的结合,不仅能够处理跨服务的事务,还能够通过消息队列实现服务解耦和消息的异步处理,提高系统的稳定性和可扩展性。

http://www.lryc.cn/news/453684.html

相关文章:

  • Vue工程化开发
  • Ray_Tracing_The_Next_Week下
  • ES索引生命周期管理
  • Oracle数据库体系结构基础
  • QT学习笔记4.5(文件、参数文件)
  • 服务器虚拟化的详细学习要点
  • 创建一个Java Web API项目
  • 对称加密算法的使用Java和C#
  • 10款好用的开源 HarmonyOS 工具库
  • ubuntu22.04中备份Iptables的设置
  • (PyTorch) 深度学习框架-介绍篇
  • 若依从redis中获取用户列表
  • 文件上传之%00截断(00截断)以及pikachu靶场
  • Chainlit集成LlamaIndex并使用通义千问实现和数据库交互的网页对话应用(text2sql)
  • 计组复习笔记
  • 62. 环境贴图2
  • MATLAB中数据导入与导出的全面指南
  • Jenkins从入门到精通,构建高效自动化流程
  • 【Android 13源码分析】Activity生命周期之onCreate,onStart,onResume-2
  • 如何在电脑上浏览手机界面
  • 国产RISC-V案例分享,基于全志T113-i异构多核平台!
  • Day(16)--File
  • Axios入门使用
  • 大数据实时数仓Hologres(四):基于Flink+Hologres搭建实时数仓
  • 关于HTML 案例_个人简历展示02
  • Windows 11 24H2 v26100.1742 官方简体中文版
  • 【AIGC半月报】AIGC大模型启元:2024.10(上)
  • Codeforces Beta Round 14 (Div. 2) E. Camels (DP)
  • CSID-GAN:基于生成对抗网络的定制风格室内平面设计框架论文阅读
  • 02SQLite