当前位置: 首页 > news >正文

消息中间件之RocketMQ源码分析(二十七)

Broker提交或回滚事务消息

当生产者本地事务处理完成并且Broker回查事务消息后,不管执行Commit还是Rollback,都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给Broker,Broker端处理该请求的类是EndTransactionProcessor
在这里插入图片描述

第一步,End_Transaction请求校验,主要检查项如下

1.Broker角色检查。Slave Broker不处理事务消息
2.事务消息类型检查。EndTransactionProcessor只处理
Commit或Rollback类型的事务消息,其余消息不处理,这里区分了事务回查
在这里插入图片描述
在这里插入图片描述

第二步,进行Commit或Rollback。

根据生产者请求头中的参数判断,是Commit请求还是Rollback请求,然后分别进行处理
在这里插入图片描述

commitMessage()

提交Half消息/这是事务消息服务接口中的一个方法,根据消息位点查询了Half消息,并将Half消息返回
在这里插入图片描述
在这里插入图片描述

checkPrepareMessage()

Half消息数据校验。校验内容包括发送消息的生产者组与当前执行Commit/Rollback的生产者是否一致,当前Half消息是否与请求Commit/Rollback的消息是否是同一条消息
在这里插入图片描述

endMessageTransaction()

消息对象类型转化,将MessageExt对象转化为MessageExtBrokerInner对象,并且还原消息之前的Topic和ConsumeQueue等信息
在这里插入图片描述

sendFinalMessage()

将还原后的事务消息最终发送到CommitLog中,一旦发送成功,消费者就可以正常拉取消息并消费
在这里插入图片描述

deletePrepareMessage()

在sendFinalMessage()执行成功后,删除Half消息。其实RocketMQ是不能真正删除消息的,其实质是顺序写磁盘,相当于做了一个"假删除"。"假删除"通过putOpMessage()方法将消息保存到TransactionMessageUtil.
buildOpTopic()的Topic中,并且做上标记TransactionalMessageUtil.REMOVETAG,表示消息已删除
在这里插入图片描述

  • 如果消息被标记为已删除,则调用addRemoveTagInTransactionOp()方法,利用标记为已删除的OP消息构造Message消息对象,并且调用存储方法保存消息
    在这里插入图片描述
  • TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息时的逻辑类似
    在这里插入图片描述
  • Half消息保存在名为MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中,执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC
    对象中,以便Broker判断是否需要回查生产者事务的执行状态。
    在这里插入图片描述
    在这里插入图片描述
  • 调用存储层方法,真正地将OP消息保存到了CommitLog中
    在这里插入图片描述

Rollback实现逻辑

Rollback并没有真正删除消息,而是标记Half消息为删除,在Broker回查时机会跳过不检查
在这里插入图片描述

rollbackMessage()

该方法与CommitMessage()方法一样,都是查询Half消息并返回消息对象。

checkPrepareMessage()

消息校验,与Commit调用的是同一个方法

deletePrepareMessage()

删除Half消息,与Commit调用的是同一个方法

http://www.lryc.cn/news/309915.html

相关文章:

  • C习题002:澡堂洗澡
  • 智能双星:遥测终端机与柳林“巡检机器人“,助力智能运维新升级!
  • 算法复习之前缀和【备战蓝桥杯】
  • IDEA基础——Maven配置tomcat
  • 数据结构测试题
  • 【MATLAB】兔子机器人总系统_动力学模型解读(及simulink中的simscape的各模块介绍)
  • Launch学习
  • 蓝桥OJ 2942数字王国之军训排队 DFS剪枝
  • SSL证书
  • 【C++】string 类 ( 上)
  • 《中华人民共和国消防法》(2021年修订版)解读
  • vue+element模仿实现云码自动验证码识别平台官网
  • 蓝桥杯练习系统(算法训练)ALGO-992 士兵杀敌(二)
  • Pycharm下如何生成exe软件
  • KubeSphere平台安装系列之三【Linux多节点部署KubeSphere】(3/3)
  • YOLOv9独家改进|动态蛇形卷积Dynamic Snake Convolution与空间和通道重建卷积SCConv与RepNCSPELAN4融合
  • XSS初级漏洞靶场
  • k8s pv与pvc理解与实践
  • Unity游戏输入系统(新版+旧版)
  • 区块链媒体:链游媒体宣发渠道9个方法分享-华媒舍
  • LeetCode--42
  • 【解决】虚幻导入FBX模型不是一个整体
  • 第四十八回 解珍解宝双越狱 孙立孙新大劫牢-Python模块和包概念与使用
  • 【Spring连载】使用Spring Data访问 MongoDB----对象映射之属性转换器
  • 【axiox】前后端接口通讯数据交互
  • 《Linux C编程实战》笔记:共享内存
  • 【GitHub】修改默认分支
  • 常用Linux 命令汇总
  • 13 双口 RAM IP 核
  • 【高级数据结构】Trie树