当前位置: 首页 > news >正文

springboot mongodb分片集群事务

前置

mongodb分片集群想要使用事务,需要对应分片没有仲裁节点

在这里插入图片描述

代码

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId><version>2.1.0.RELEASE</version></dependency>

如果是单个mongos

import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.stereotype.Component;/*** @author kittlen* @date 2024-04-09 17:20* @description*/@Component
public class MongodbConfig {@Beanpublic MongoTransactionManager transactionManager(MongoDbFactory factory) {return new MongoTransactionManager(factory);}
}

使用

@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate MongoTransactionManager mongoTransactionManager;public int dbFunc(){TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);return transactionTemplate.execute(status -> {try {UpdateResult updateResult = mongoTemplate.updateFirst(query, update, collection1);long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;if (l > 0) {mongoTemplate.insert(saveEntity, collection2);}return 1;} catch (Exception e) {// 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚status.setRollbackOnly();return 0;}});
}

如果连接是多mongos,则需要重写BaseCluster类

多mongos时使用的是随机获取的方式获取mongosClient,通过记录第一次调用的client使后续事务内的请求都通过同一个client请求,防止出现不同mongos导致事务失败情况

事务记录类


import com.mongodb.connection.Server;import java.util.function.Supplier;/*** @author kittlen* @date 2024-04-29 12:08* @description*/public class MultiServiceTransactionConfig {/*** mongodb多实例事务使用*/private static ThreadLocal<Server> mongoMultiServerTransactionUserService = new ThreadLocal<>();/*** 是否开启多实例事务*/private static ThreadLocal<Boolean> mongoMultiServerTransactionCanUser = new ThreadLocal<>();/*** 获取service** @param supplier 如果该service不存在,则获取新service的方法* @return*/public static Server getService(Supplier<Server> supplier) {Server server = mongoMultiServerTransactionUserService.get();if (server != null) {return server;} else {Server saveServer = supplier.get();mongoMultiServerTransactionUserService.set(saveServer);return saveServer;}}/*** 开启事务记录*/public static void openMultiServerTransaction() {mongoMultiServerTransactionCanUser.set(true);}/*** 是否开启多实例事务** @return*/public static boolean canOpenMultiServerTransaction() {Boolean b = mongoMultiServerTransactionCanUser.get();return Boolean.TRUE.equals(b);}/*** 清除事务配置信息*/public static void clean() {mongoMultiServerTransactionCanUser.remove();mongoMultiServerTransactionUserService.remove();}
}

重写mongodb的类com.mongodb.internal.connection.BaseCluster的selectServer方法

@Overridepublic Server selectServer(final ServerSelector serverSelector) {isTrue("open", !isClosed());try {CountDownLatch currentPhase = phase.get();ClusterDescription curDescription = description;ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);Server server;if (this instanceof MultiServerCluster) {server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);} else {server = selectRandomServer(compositeServerSelector, curDescription);}boolean selectionFailureLogged = false;long startTimeNanos = System.nanoTime();long curTimeNanos = startTimeNanos;long maxWaitTimeNanos = getMaxWaitTimeNanos();while (true) {throwIfIncompatible(curDescription);if (server != null) {return server;}if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {throw createTimeoutException(serverSelector, curDescription);}if (!selectionFailureLogged) {logServerSelectionFailure(serverSelector, curDescription);selectionFailureLogged = true;}connect();currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS);curTimeNanos = System.nanoTime();currentPhase = phase.get();curDescription = description;server = selectRandomServer(compositeServerSelector, curDescription);}} catch (InterruptedException e) {throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e);}}

重点为:

			Server server;if (this instanceof MultiServerCluster) {server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);} else {server = selectRandomServer(compositeServerSelector, curDescription);}

使用

			try {TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);MultiServiceTransactionConfig.openMultiServerTransaction();return transactionTemplate.execute(status -> {try {UpdateResult updateResult = mongoTemplate.updateFirst(query, update,ollection1);long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;if (l > 0) {mongoTemplate.insert(historyDetailsEntity, collection2);}return 1;} catch (Exception e) {// 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚status.setRollbackOnly();return 0;}});} finally {MultiServiceTransactionConfig.clean();}
http://www.lryc.cn/news/342478.html

相关文章:

  • node报错——解决Error: error:0308010C:digital envelope routines::unsupported——亲测可用
  • golang系统内置函数整理
  • 武汉星起航:五对一服务体系,助力创业者成功进军跨境电商市场
  • C++常用库函数——strcmp、strchr
  • vue3怎么使用vant的IndexBar 索引栏
  • VMware常见问题(技巧)总结
  • VS Code 保存+格式化代码
  • word启动缓慢之Baidu Netdisk Word Addin
  • 获取波形极值与间距并显示
  • 视频素材哪个app好?8个视频素材库免费使用
  • 002 validation自定义校验器
  • SQL-Server数据库--视图
  • Flink 部署模式
  • 第十三节:Vben Admin实战-系统管理之菜单管理
  • 2024------MySQL数据库基础知识点总结
  • 机器学习之基于Jupyter中国环境治理投资数据分析及可视化
  • 【Word】写论文,参考文献涉及的上标、尾注、脚注 怎么用
  • 能将图片转为WebP格式的WebP Server Go
  • 省份数量00
  • Android Native内存泄漏检测方案详解
  • 有限单元法-编程与软件应用(崔济东、沈雪龙)【PDF下载】
  • 蓝桥杯练习系统(算法训练)ALGO-950 逆序数奇偶
  • uniapp踩坑 uni.showToast 和 uni.showLoading
  • BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU、TCN-BIGRU-ATTENTION合集
  • 通过 Java 操作 redis -- 基本通用命令
  • Jenkins集成Kubernetes 部署springboot项目
  • 个股期权是什么期权?个股期权什么时候推出?
  • TCP UDP
  • PCIE协议-1
  • [C++][PCL]pcl安装包预编译包国内源下载地址