当前位置: 首页 > news >正文

RocketMQ 事务消息 原理及使用方法解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月24日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • RocketMQ事务消息
  • 适用场景举例
  • 使用示例
    • 发送事务消息
    • 事务回查
    • 事务执行

RocketMQ事务消息

RocketMQ针对事务消息扩展了两个相关的概念:

1. 半消息

半消息(Half Message)是一种特殊的消息类型,处于这个状态的消息暂时不能被Consumer消费。

当一条事务消息被成功投递到Broker上,但Broker没有收到Producer的二次确认时,该事务消息就处于暂时不可消费的状态,这种消息就是半消息


2. 消息状态回查

由于网络抖动、系统宕机等等原因,可能导致ProducerBroker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态。

这个机制主要是用来解决分布式事务中的超时问题。

在这里插入图片描述
上图是RocketMQ官网提供的事务消息流程图,执行步骤如下:

  1. ProducerBroker端发送半消息
  2. Broker发送ACK确认,表示半消息发送成功
  3. Producer执行本地事务
  4. 本地事务完毕,根据事务的状态,ProducerBroker发送二次确认消息,确认该半消息的CommitRollback状态。Broker收到二次确认消息之后:如果是Commit状态,则直接将消息发送到Consumer端执行消费逻辑;如果是Rollback状态,则会直接将其标记为失败,不会发送给Consumer
  5. 针对超时情况,Broker主动向Producer发起消息回查
  6. Producer处理回查消息,返回对应的本地事务执行结果
  7. Broker针对消息回查的结果,执行【步骤4】的操作

适用场景举例

以转账系统为例,假设A要向B转账100元,执行本地事务和发送异步消息的过程应该同时保持成功或失败,即A的账户扣款成功后,就一定要发消息发送出去,最直观的思路可能有两个:

1. 先发消息

这种策略的流程如下:
在这里插入图片描述
存在的问题是: 如果消息发送成功,但后续A扣款失败了,消费端仍然会消费这条消息,进而向B账户里打钱,数据就出现不一致的情况了。


2. 后发消息

在这里插入图片描述
存在的问题是: 如果扣款成功,但是发送消息失败,就会出现A已经扣钱了,但B账户里没有入账的情况,同样也是无法接受的。

出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的,因此也就无法做到同时失败或同时成功,所以数据一致性难以保障。


解决上述问题的方法就是上面提到的半消息
在这里插入图片描述
如上图所示,执行本地事务之前先发送一个半消息,此时还不能被消费者消费,只有当本地事务执行完毕并发送二次确认消息之后,半消息才能被Consumer消费。

如此以来就保证了多个系统数据的数据一致性,前提是系统不需要保证数据的强一致性


使用示例

发送事务消息

RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程,因此这个过程可能会“稍显复杂”

发送事务消息使用的是TransactionMQProducer,一个简单的demo如下:

public class TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionCheckListener transactionCheckListener = new TransactionCheckListener() {@Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {return null;}};TransactionMQProducer producer = new TransactionMQProducer("GROUP A");producer.setCheckThreadPoolMinSize(2);producer.setCheckThreadPoolMaxSize(2);producer.setCheckRequestHoldMax(2000);producer.setTransactionCheckListener(transactionCheckListener);producer.start();String[] tags = new String[]{"TAG A", "TAG B", "TAG C"};LocalTransactionExecuter transactionExecuter = new LocalTransactionExecuter() {@Overridepublic LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {return null;}};for (int i = 0; i < 10; i++) {Message msg = new Message("TEST", tags[i % tags.length], "KEY " + i, ("HELLO, ROCKETMQ" + i).getBytes());SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, null);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}producer.shutdown();}
}

事务回查

checkLocalTransaction是事务消息回查监听方法,可以获取本地事务状态,根据事务的状态来确定是否要发送二次确认消息,或者进行事务回滚操作。

消息回查事务的状态由以下几种情况:

  1. LocalTransactionState.ROLLBACK_MESSAGE:事务回滚
  2. LocalTransactionState.COMMIT_MESSAGE:事务提交
  3. LocalTransactionState.UNKNOW:未知状态,此时Broker会定时重新查询Producer消息的状态,直到出现前面两种情况。
public interface TransactionListener {LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

事务执行

executeLocalTransaction方法用于执行本地事务,如果本地事务执行成功则进行事务提交,否则进行事务回滚,如果是UNKNOW状态的话,Broker就会定时回查Producer的消息状态,直到彻底成功或失败。

public interface TransactionListener {LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
}

http://www.lryc.cn/news/42485.html

相关文章:

  • 为什么 ChatGPT 输出时经常会中断,需要输入“继续” 才可以继续输出?
  • PyTorch 之 基于经典网络架构训练图像分类模型
  • Scrapy的callback进入不了回调方法
  • 第二十一天 数据库开发-MySQL
  • 蓝桥杯每日一真题—— [蓝桥杯 2021 省 AB2] 完全平方数(数论,质因数分解)
  • Linux编辑器-vim
  • 5G将在五方面彻底改变制造业
  • http和https的区别?
  • 【Spring Cloud Alibaba】4.创建服务消费者
  • C语言——动态内存管理 malloc、calloc、realloc、free的使用
  • 技术分享——Java8新特性
  • vue基础知识大全
  • 第2篇|文献研读|nature climate change|减缓气候变化和促进热带生物多样性的碳储量走廊
  • 从暴力递归到动态规划(2)小乖,你也在为转移方程而烦恼吗?
  • Leetcode.1638 统计只差一个字符的子串数目
  • KoTime:v2.3.9新增线程管理(线程统计、状态查询等)
  • 直面风口,未来不仅是中文版ChatGPT,还有AGI大时代在等着我们
  • 若依微服务(ruoyi-cloud)保姆版容器编排运行
  • vue2图片预览插件
  • 手写Promise源码的实现思路
  • 【数据结构】-关于树的概念和性质你了解多少??
  • 【前端之旅】NPM必知必会
  • Android SQLite使用事务来确保所有语句都以原子方式执行及保证数据完整性一次执行多条语句示例
  • nodejs+vue校园超市小卖部零食在线购物商城系统
  • Karl Guttag:论相机对焦技术在AR/VR中的沿用
  • ECL@SS学习笔记(3)-概念数据模型
  • 206. 反转链表
  • 文心一言 vs GPT-4 —— 全面横向比较
  • rancher2.6进阶之kubectl安装
  • 图像基本变换