当前位置: 首页 > news >正文

六、RocketMQ发送事务消息

事务消息介绍

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

当主分支订单系统状态更新失败后,物流、积分、购物车系统都不应该接收到消息

事务消息的发送流程

使用普通消息是做不到的,因为他会直接将消息发送到topic中

而事务消息参考了两阶段提交的原理,

  1. 先把消息发送broker中
  2. 当消息发送成功后,会执行本地事务
  3. 通过本地事务的执行情况,返回一个状态
  4. 状态对应三种情况
    • LocalTransactionState.UNKNOW:需要broker调用发送端的回查机制
    • LocalTransactionState.COMMIT_MESSAGE:broker将消息发送到指定的topic,此时消费端可以接收到消息
    • LocalTransactionState.ROLLBACK_MESSAGE:broker丢弃消息,不发送到指定的topic,消费端接收不到消息

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述

@Test
public void sendTrans() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 创建事务消息发送客户端TransactionMQProducer transProducer = new TransactionMQProducer("test-trans-producer");transProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 指定回查事务消息时的线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});transProducer.setExecutorService(executorService);// 设置事务监听器transProducer.setTransactionListener(new TransactionListener() {// 执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(Thread.currentThread().getName() + ":执行本地事务");// 触发回查机制return LocalTransactionState.UNKNOW;}// 回查本地事务,如果执行本地事务返回UNKNOW状态或者生产者应用退出导致本地事务未提交任何状态@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println(Thread.currentThread().getName() + ":触发事务回查");// 提交事务return LocalTransactionState.COMMIT_MESSAGE;}});transProducer.start();Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes());// 发送事务消息SendResult send = transProducer.sendMessageInTransaction(message,null);System.out.println(send.getSendStatus());Thread.sleep(Integer.MAX_VALUE);
}

注:需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

http://www.lryc.cn/news/193505.html

相关文章:

  • Node.js初体验
  • 激活函数理解
  • 【docker - 安装】windows 10 专业版 安装docker,以及 WSL kernel version too low 解决方案
  • 洛谷P1601
  • Elasticsearch:使用 LangChain 对话链和 OpenAI 的聊天机器人
  • 铜死亡+机器学习+WGCNA+分型生信思路
  • GB28181平台简介
  • JVM基础:初识JVM
  • 至强服务器BIOS/UEFI驱动开发笔记
  • Linux:Termius连接本地虚拟机与虚拟机快照
  • 高校教务系统登录页面JS分析——四川大学
  • Kafka SASL认证授权(四)认证源码解析
  • 软件测试学习(一)基础概念、实质、说明书测试、分类、动态黑盒测试
  • 在fastapi中实现异步
  • js数组去重
  • 【前端】根据后端返回的url进行下载并设置文件下载名称
  • 《视觉SLAM十四讲》公式推导(一)
  • 简单好用的解压缩软件:keka 中文 for mac
  • 【UE 插件】UE4 虚幻引擎 插件开发(带源码插件打包、无源码插件打包) 有这一篇文章就够了!!!
  • C# CodeFormer 图像修复
  • Android Studio的笔记--HttpURLConnection使用GET下载zip文件
  • phantom3D模体
  • 贪心算法解决批量开票限额的问题
  • Unity后台登录/获取数据——BestHTTP的使用Get/Post
  • 【Windows日志】记录系统事件的日志
  • 物联网开发学习笔记——目录索引
  • Prometheus:优秀和强大的监控报警工具
  • Appium
  • 自动驾驶学习笔记(五)——绕行距离调试
  • 【Android】VirtualDisplay创建流程及原理