当前位置: 首页 > news >正文

【BUG事务内消息发送】事务内消息发送,事务还未结束,消息发送已被消费,查无数据怎么解决?

问题描述

在一个事务内完成插入操作,通过MQ异步通知其他微服务进行事件处理。
由于是在事务内发送,其他服务消费消息,查询数据时还不存在如何解决呢?


解决方案

通过spring-tx包的TransactionSynchronizationManager事务管理器解决。

public abstract class TransactionSynchronizationManager {private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");/*** Return if transaction synchronization is active for the current thread.* Can be called before register to avoid unnecessary instance creation.* @see #registerSynchronization*/		public static boolean isSynchronizationActive() {return (synchronizations.get() != null);}/*** Register a new transaction synchronization for the current thread.* Typically called by resource management code.* <p>Note that synchronizations can implement the* {@link org.springframework.core.Ordered} interface.* They will be executed in an order according to their order value (if any).* @param synchronization the synchronization object to register* @throws IllegalStateException if transaction synchronization is not active* @see org.springframework.core.Ordered*/public static void registerSynchronization(TransactionSynchronization synchronization)throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null");Set<TransactionSynchronization> synchs = synchronizations.get();if (synchs == null) {throw new IllegalStateException("Transaction synchronization is not active");}synchs.add(synchronization);}}

Rocketmq方法封装,通过TransactionSynchronizationManager.isSynchronizationActive()判断当前方法的调用是否在事务内。
如果是,则注册一个事务同步适配器,在事务提交后发送消息。
否则直接发送。

    /*** 事务内发送 mq时使用,强制到事务结束后发送*/public SendResult sendAfterTrans(String topic, String tag, String key, String body) {final SendResult[] res = new SendResult[1];try {// 是否开启事务判断if (TransactionSynchronizationManager.isSynchronizationActive()) {log.debug("Mysql事务内Mq消息发送  延迟到事务提交后 waiting……");TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {log.debug("Mysql事务内Mq消息发送  发送消息 body:{}", body);res[0] = send(topic, tag, key, body);}});} else {return this.send(topic, tag, key, body);}} catch (Exception e) {e.printStackTrace();}return res[0];}

在这里插入图片描述

http://www.lryc.cn/news/144095.html

相关文章:

  • 数据分析作业四-基于用户及物品数据进行内容推荐
  • 在腾讯云服务器OpenCLoudOS系统中安装svn(有图详解)
  • C语言日常刷题5
  • 【LeetCode-中等题】73. 矩阵置零
  • 本地部署 FastGPT
  • 软件工程(十八) 行为型设计模式(四)
  • Socket通信与WebSocket协议
  • 新KG视点 | Jeff Pan、陈矫彦等——大语言模型与知识图谱的机遇与挑战
  • 详解过滤器Filter和拦截器Interceptor的区别和联系
  • List常用的操作
  • Android studio APK切换多个摄像头(Camera2)
  • ChatGPT 对教育的影响,AI 如何颠覆传统教育
  • Spring(九)声明式事务
  • java中用HSSFWorkbook生成xls格式的excel(亲测)
  • 做平面设计一般电脑可以吗 优漫动游
  • 设计模式备忘录+命令模式实现Word撤销恢复操作
  • Linux centos7 bash编程小训练
  • 创作2周年纪念日-特别篇
  • 【UE5】用法简介-使用MAWI高精度树林资产的地形材质与添加风雪效果
  • 兼容AD210 车规级高精度隔离放大器:ISO EM210
  • R语言常用数组函数
  • 前端开发之Element Plus的分页组件el-pagination显示英文转变为中文
  • 基于Java+SpringBoot+Vue前后端分离社区医院管理系统设计和实现
  • 浅谈单例模式在游戏开发中的应用
  • Stable Diffusion WebUI 整合包
  • 什么是 RESTful API
  • 如何搭建关键字驱动自动化测试框架?
  • WPF实战项目十二(API篇):配置AutoMapper
  • Linux 内核模块加载过程之重定位
  • Flink流批一体计算(19):PyFlink DataStream API之State