当前位置: 首页 > news >正文

RocketMq 事务消息原理

Rocketmq 事务消息API使用

使用TransactionMQProducer类。 实现TransactionListener 接口覆盖其方法executeLocalTransaction和checkLocalTransaction 即可。

其中executeLocalTransaction 执行本地方法和checkLocalTransaction 事务状态回查。

玩法

  1. 简历一张本地事务表,字段大概有(rocketmq事务id,业务事务id),
  2. 于executeLoalTransaction,利用数据库事务特性,和业务数据同时持久化到数据库。
  3. checkLocalTransaction. 按rocketmq事务id查询数据库,是否有对应的数据。

为什么需要本地事务表

保证可靠性。当业务事务提交后节点宕机。rocketmq同样也能回查到数据。

流程分析

事务消息源码分析 

实现原理是基于二阶段提交和定时事务状态回查实现的。

二阶段提交分析

涉及相关类

Producer

TransactionMQProducer

DefaultMQProducerImpl

TransactionListener

Broker

SendMessageProcessor

EndTransactionProcessor

分析流程

  1. 入口方法TransactionMQProducer.sendMessageInTransaction 投递事务消息
  2. 调用DefaultMQProducerImpl.sendMessageInTransaction
  3. 为消息头部增加事务消息标志,发送消息。
  4. Broker 入口方法 SendMessageProcessor#sendMessage检查消息头部是否有事务标记,有投递半消息。响应Producer 结果包括事务id
  5. Producer收到消息成功发送结果后,执行本地事务。并通知Broker 本地事务执行结果。
  6. Broker 入口方法EndTransactionProcessor#processRequest 。按收到结果做决定。若是事务提交则投递普通消息,删除半消息。若是事务回滚则删除半消息。

事务消息回查

RocketMQ 通过TramsactionalMessageCheckService 线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。TransactionalMessageCheckService 的检测频率默认为1分钟,可通过broker.conf文件中设置transactionCheckInterval 来改变默认值,单位为毫秒

public class TransactionalMessageCheckService extends ServiceThread {private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);private BrokerController brokerController;public TransactionalMessageCheckService(BrokerController brokerController) {this.brokerController = brokerController;}//.... 省略代码@Overrideprotected void onWaitEnd() {long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);}}

http://www.lryc.cn/news/96669.html

相关文章:

  • day41-Verify Account Ui(短信验证码小格子输入效果)
  • C. Maximum Set
  • 基于springboot+vue学生宿舍报修公寓管理系统
  • 缓存和数据库一致性问题分析
  • 用Rust生成Ant-Design Table Columns | 京东云技术团队
  • java.lang.ClassNotFoundException: sun.misc.BASE64Decoder
  • Unity进阶--对象池数据场景管理器笔记
  • 【Seata】微服务集成seata
  • 解决react,<img>src使用require方法引入图片不显示问题
  • 从小白到大神之路之学习运维第67天-------Tomcat应用服务 WEB服务
  • 图解SQL基础知识,小白也能看懂的SQL文章
  • 自动驾驶感知系统-毫米波雷达
  • Esp32_Arduino接入腾讯云笔记
  • python简单入门
  • 如何快速从csv文件搭建一个简单的神经网络模型(回归)
  • Pytorch深度学习-----DataLoader的用法
  • macOS Ventura 13.5 (22G74) Boot ISO 原版可引导镜像下载
  • 【机器学习】 奇异值分解 (SVD) 和主成分分析 (PCA)
  • 如何用logging记录python实验结果?
  • C语言假期作业 DAY 03
  • 使用serverless实现从oss下载文件并压缩
  • 从上到下打印二叉树
  • 【推荐】排序模型的调优
  • 负载均衡安装配置详解
  • Java-逻辑控制
  • UE 透明渲染次序
  • 【C++】多态原理剖析,Visual Studio开发人员工具使用查看类结构cl /d1 reportSingleClassLayout
  • vue实现flv格式视频播放
  • iptables安全技术和防火墙
  • 微信小程序开发5