芝法酱学习笔记(1.3)——SpringBoot+mybatis plus+atomikos实现多数据源事务
一、前言
1.1 业务需求
之前我们在讲解注册和登录的时候,有一个重要的技术点忽略了过去。那就是多数据源的事务问题。
按照我们的业务需求,monitor服务可能涉及同时对监控中心数据库和企业中心数据库进行操作,而我们希望这样的操作在一个事务下。并且,企业中心有多个数据库,我们需要一个自动切库的机制。
1.2 多数据源事务技术选型
多个数据库的切库可以使用AbstractRoutingDataSource作为数据源,但要支持事务却没那么简单。多数据源事务,我们最先想到的就是分布式事务,阿里的seata框架可以很好的解决分布式事务问题。所谓分布式事务,就是指一个事务可能分布在不同的服务器上,但需要各服务器同时完成,然后再提交数据库。如果有哪个服务器失败,则一起回滚。
然而,我们这里的需求,仅仅是单一服务器操作多个数据源。如果因此引入seata,还多了一个事务中心的服务器,无疑增加了运维的成本。所以我们打算使用atomikos实现多数据源的事务。
1.3 atomikos介绍
Atomikos 是一个轻量级的 Java 分布式事务管理器。符合XA 和 JTA(Java Transaction API) 规范。有的帖子说atomikos的性能欠佳,因为会上行锁。但我们扪心自问,我们的业务真的有对同一行数据并发的情况么?
当然,我们后面的大章,可能也会介绍seata的方案。不过当前学习阶段,还是先采用Atomikos吧
二、所遇到的挑战
当我们草草的接入atomikos后,会发现AbstractRoutingDataSource数据源下,在一个Transactional注解内,使用AbstractRoutingDataSource作为动态数据源是无法实现切库的。想要探究其原因,需要翻阅源码。
首先,我们按照网上通行的方式,配置一个AbstractRoutingDataSource。使用单步调试,观察其不可切库的原因。
@Slf4j
@Configuration
public class DataSourceConfig {@Beanpublic DataSource busyDataSource(){MultiDataSource multiDataSource = new MultiDataSource();multiDataSource.init();return multiDataSource;}public class MultiDataSource extends AbstractRoutingDataSource{public static ThreadLocal<String> curKey = new ThreadLocal<>();public void init(){HikariDataSource dataSourceMonitor = DataSourceBuilder.create().type(HikariDataSource.class).driverClassName("com.mysql.cj.jdbc.Driver").url("jdbc:mysql://192.168.0.64:3306/study2024-class007-monitor?useUnicode=true&characterEncoding=utf-8").username("dbMgr").password("???@7").build();HikariDataSource dataSource1 = DataSourceBuilder.create().type(HikariDataSource.class).driverClassName("com.mysql.cj.jdbc.Driver").url("jdbc:mysql://192.168.0.64:3306/study2024-class007-busy001?useUnicode=true&characterEncoding=utf-8").username("dbMgr").password("???@7").build();HikariDataSource dataSource2 = DataSourceBuilder.create().type(HikariDataSource.class).driverClassName("com.mysql.cj.jdbc.Driver").url("jdbc:mysql://192.168.0.64:3306/study2024-class007-busy002?useUnicode=true&characterEncoding=utf-8").username("dbMgr").password("???@7").build();Map<Object, Object> targetDataSources = new HashMap<>();targetDataSources.put("monitor", dataSourceMonitor);targetDataSources.put("busy001", dataSource1);targetDataSources.put("busy002", dataSource2);setTargetDataSources(targetDataSources);setDefaultTargetDataSource(dataSourceMonitor);curKey.set("monitor");}@Overrideprotected Object determineCurrentLookupKey() {return curKey.get();}public static void changeDb(String pKey){curKey.set(pKey);}}
}
我们建一个test_table的表,写一段简单的测试逻辑
@RequiredArgsConstructor
@Service
public class TestCurdServiceImpl implements ITestCurdService {private final IGenMonitorTestDbService mGenMonitorTestDbService;private final IGenBusyTestDbService mGenBusyTestDbService;@Transactional(rollbackFor = Exception.class)@Overridepublic void mpSave(TestEntityDto pTestDto) {GenMonitorTestEntity genMonitorTestEntity = DbDtoEntityUtil.createFromDto(pTestDto,GenMonitorTestEntity.class);GenBusyTestEntity genBusyTestEntity = DbDtoEntityUtil.createFromDto(pTestDto,GenBusyTestEntity.class);genBusyTestEntity.setEnpId(0L);genBusyTestEntity.setEnpCode("sys");genMonitorTestEntity.setEnpId(0L);genMonitorTestEntity.setEnpCode("sys");DataSourceConfig.MultiDataSource.changeDb("monitor");mGenMonitorTestDbService.save(genMonitorTestEntity);DataSourceConfig.MultiDataSource.changeDb("busy001");mGenBusyTestDbService.save(genBusyTestEntity);DataSourceConfig.MultiDataSource.changeDb("busy002");mGenBusyTestDbService.save(genBusyTestEntity);}}
SpringBoot中,通过@EnableTransactionManagement,通过@Import(TransactionManagementConfigurationSelector.class)等一系列操作,最终在ProxyTransactionManagementConfiguration文件配置了一个TransactionInterceptor的bean,其基类TransactionAspectSupport就是Spring对事务AOP实现的核心代码了,拦截我们的Transaction注解下方法的函数是invokeWithinTransaction。再进一步阅读跟踪,发现在DataSourceTransactionManager的doBegin函数中,需要获取datasource的connection,并关闭自动提交。获取的connection,会放到ConnectionHolder里。
而每次执行mybatis的命令,实质上执行的是mybatis包下SimpleExecutor的prepareStatement,每次查找前,都会调用transaction.getConnection()。而这个类被实例化时,他的transaction用的是SpringManagedTransaction。其getConnection代码如下:
public Connection getConnection() throws SQLException {if (this.connection == null) {openConnection();}return this.connection;}
我们可以看到,如果获取过链接了,就不会再获取了。事务doBegin时获取了一次,所以事务注解内的sql执行不会再获取。
而我们发现,SimpleExecutor的实例化是使用Mybatis的Configuration类中的信息,决定使用哪个transactionManager。
那么,我们自己写一个TransactionManager,处理多数据源下的问题,不就解决了么?
另一个问题,我们传统使用bean注解方式创建bean,并不能实现根据配置动态批量的创建bean。然而,我们希望这套代码可以放到公司项目仓库中,被业务代码引用。这时我们就需要使用Spring的容器装配。
三、代码实现
3.1 pom引用
<dependency><groupId>com.atomikos</groupId><artifactId>transactions-spring-boot3-starter</artifactId><version>${atomikos.version}</version>
</dependency>
<atomikos.version>6.0.0</atomikos.version>
3.2 MultiDataSourceTransaction
前边提到,之所以无法切库,是Transaction配置的问题。我们为AbstractRoutingDataSource专门写一个Transaction管理器
@Slf4j
public class MultiDataSourceTransaction implements Transaction {private ZfRoutingDataSource mZfRoutingDataSource;private Map<String,MultiDataSourceConnectionInfo> connectionMap;public MultiDataSourceTransaction(DataSource pDataSource) {if(pDataSource instanceof ZfRoutingDataSource zfRoutingDataSource){mZfRoutingDataSource = zfRoutingDataSource;connectionMap = new ConcurrentHashMap<String,MultiDataSourceConnectionInfo>();}else{throw new ServiceException("传入的DataSource 必须是ZfRoutingDataSource");}}@Overridepublic Connection getConnection() throws SQLException {String curKey = mZfRoutingDataSource.curKey();MultiDataSourceConnectionInfo multiDataSourceConnectionInfo = connectionMap.get(curKey);if(null == multiDataSourceConnectionInfo) {multiDataSourceConnectionInfo = new MultiDataSourceConnectionInfo();DataSource targetDataSource = mZfRoutingDataSource.getTargetDataSource();Connection connection = DataSourceUtils.getConnection(targetDataSource);multiDataSourceConnectionInfo.setDataSource(targetDataSource);multiDataSourceConnectionInfo.setConnection(connection);multiDataSourceConnectionInfo.setAutoCommit(connection.getAutoCommit());multiDataSourceConnectionInfo.setConnectionTransactional(DataSourceUtils.isConnectionTransactional(connection, targetDataSource));connectionMap.put(curKey, multiDataSourceConnectionInfo);}return multiDataSourceConnectionInfo.getConnection();}@Overridepublic void commit() throws SQLException {for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {if(!multiDataSourceConnectionInfo.isConnectionTransactional() && !multiDataSourceConnectionInfo.isAutoCommit()){Connection targetConnection = multiDataSourceConnectionInfo.getConnection();targetConnection.commit();}}}@Overridepublic void rollback() throws SQLException {for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {if(!multiDataSourceConnectionInfo.isConnectionTransactional() && !multiDataSourceConnectionInfo.isAutoCommit()){Connection targetConnection = multiDataSourceConnectionInfo.getConnection();targetConnection.rollback();}}}@Overridepublic void close() throws SQLException {for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {DataSourceUtils.releaseConnection(multiDataSourceConnectionInfo.getConnection(), multiDataSourceConnectionInfo.getDataSource());}connectionMap.clear();}@Overridepublic Integer getTimeout() throws SQLException {var holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(mZfRoutingDataSource);if (holder != null && holder.hasTimeout()) {return holder.getTimeToLiveInSeconds();}return null;}
}
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {@Overridepublic Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {return new MultiDataSourceTransaction(dataSource);}
}
3.3 mapper的代码结构
首先,我们明确一下需求。我们的数据库分2种,一种是监控中心的库,只有一个数据库实例。而企业中心,却有多个实例。
我们在开发中,可以把监控中心的库的mapper和企业中心的不放在一个包中,分别配置:
这样,我们可以把监控中心配置为单数据源,企业中心配置为多数据源。
@MapperScans(value = {@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.monitor.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_monitor"),@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.busy.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_busy"),
}
)
3.4 RoutingDataSource
我们针对多数据源,考虑有些项目可能有多组不同的多数据源。比如常见的订单中心,优惠券中心,商品定义中心。写个自己的memo。
public class ZfRoutingDataSource extends AbstractRoutingDataSource {ThreadLocal<String> curDbKey = new ThreadLocal<String>();@Overrideprotected Object determineCurrentLookupKey() {return curDbKey.get();}public String curKey(){return curDbKey.get();}public void set(String pKey) {curDbKey.set(pKey);}public void clear() {curDbKey.remove();}public DataSource getTargetDataSource() {return determineTargetDataSource();}
}
public class RoutingDataSourceMemo {private Map<String,ZfRoutingDataSource> mRoutingDataSource;public RoutingDataSourceMemo() {mRoutingDataSource = new ConcurrentHashMap<String,ZfRoutingDataSource>();}public ZfRoutingDataSource createRoutingDataSource(String pKey, Map<Object, Object> pDataSourceMap, DataSource pPrimaryDataSource) {ZfRoutingDataSource zfRoutingDataSource = new ZfRoutingDataSource();zfRoutingDataSource.setTargetDataSources(pDataSourceMap);zfRoutingDataSource.setDefaultTargetDataSource(pPrimaryDataSource);zfRoutingDataSource.initialize();mRoutingDataSource.put(pKey, zfRoutingDataSource);return zfRoutingDataSource;}public ZfRoutingDataSource get(String pKey) {return Optional.<ZfRoutingDataSource>ofNullable(mRoutingDataSource.get(pKey)).orElseThrow(()->new ServiceException("没有找到key为"+pKey+"的数据源"));}
}
@Configuration
public class RoutingDataSourceMemoConfigure {@BeanRoutingDataSourceMemo routingDataSourceMemo(){return new RoutingDataSourceMemo();}
}
3.5 bean装配
3.5.1 常见接口的介绍
ImportBeanDefinitionRegistrar
一般该接口配合@Import使用。
该接口一般用于导入bean,其有2个接口。
void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator);
void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry);
该接口可以读取类上的注解,并依据注解的配置信息,批量注册创建bean的PostProcessor的定义。
该接口允许类继承以下4个接口,用于获取Spring容器的一些关键对象
- EnvironmentAware
- BeanFactoryAware
- BeanClassLoaderAware
- ResourceLoaderAware
BeanDefinitionRegistryPostProcessor
该接口一般用于批量注册实际的bean,其下也有2个接口
void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException;@Overridedefault void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
第一个接口用于向容器注册bean的定义,第二个接口可以直接向容器注册bean的实例
FactoryBean
.该接口用于延迟化实例化bean。由于许多bean的实例化需要依赖其他bean的创建,那么干脆在该bean被第一次使用到时进行加载。实际上,SpringBoot对Bean的生命周期管理,是基于FactoryBean而非具体的Bean。
该接口的核心接口为:
T getObject() throws Exception;
3.5.2 MultiDataSourceConfigurerRegister
现在,我们开始编写本章需求的代码。这个Register用于向容器注册一个批量注册数据源和SessionFactory的PostProcessor
@Slf4j
public class MultiDataSourceConfigurerRegister implements ImportBeanDefinitionRegistrar,EnvironmentAware, ResourceLoaderAware, BeanFactoryAware {private Environment mEnvironment;private ResourceLoader mResourceLoader;private BeanFactory mBeanFactory;public MultiDataSourceConfigurerRegister() {}@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MultiDataSourceConfigurer.class);builder.addConstructorArgValue(mBeanFactory);builder.addConstructorArgValue(mResourceLoader);builder.addConstructorArgValue(mEnvironment);builder.addConstructorArgReference("routingDataSourceMemo");registry.registerBeanDefinition(MultiDataSourceConfigurer.class.getName(),builder.getBeanDefinition());}@Overridepublic void setEnvironment(Environment pEnvironment) {mEnvironment = pEnvironment;}@Overridepublic void setResourceLoader(ResourceLoader pResourceLoader) {mResourceLoader = pResourceLoader;}@Overridepublic void setBeanFactory(BeanFactory pBeanFactory) throws BeansException {mBeanFactory = pBeanFactory;}
}
3.5.3 MultiDataSourceConfigurerRegister
该类是注册Datasource和SessionFactory的FactoryBean的类
@Slf4j
public class MultiDataSourceConfigurer implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware {private BeanFactory mBeanFactory;private ResourceLoader mResourceLoader;private Environment mEnvironment;private ApplicationContext mApplicationContext;static final String DATASOURCE_PREFIX = "datasource_";static final String SQL_SESSION_FACTORY_PREFIX = "sqlSessionFactory_";static final String SQL_SESSION_TEMPLATE_PREFIX = "sqlSessionTemplate_";RoutingDataSourceMemo mRoutingDataSourceMemo;public MultiDataSourceConfigurer(BeanFactory pBeanFactory,ResourceLoader pResourceLoader,Environment pEnvironment,RoutingDataSourceMemo pRoutingDataSourceMemo) {mBeanFactory = pBeanFactory;mResourceLoader = pResourceLoader;mEnvironment = pEnvironment;mRoutingDataSourceMemo = pRoutingDataSourceMemo;}@Overridepublic void afterPropertiesSet() throws Exception {}@Overridepublic void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {String profile = mEnvironment.getProperty("spring.profiles.active");String url = mEnvironment.getProperty("spring.datasource.url");String userName = mEnvironment.getProperty("spring.datasource.username");String password = mEnvironment.getProperty("spring.datasource.password");SingleConnectionDataSource singleConnectionDataSource = new SingleConnectionDataSource(url, userName,password,false);JdbcTemplate jdbcTemplate = new JdbcTemplate(singleConnectionDataSource);List<Map<String,Object>> res = jdbcTemplate.queryForList("select * from sys_db where profile = ?",profile);List<SysDbEntity> sysDbEntityList = res.stream().map(m-> BeanUtil.toBean(m,SysDbEntity.class)).collect(Collectors.toList());Map<String,List<SysDbEntity>> sysDbEntityListMap = sysDbEntityList.stream().collect(Collectors.groupingBy(SysDbEntity::getDatasourceGroup));for(Map.Entry<String,List<SysDbEntity>> entry : sysDbEntityListMap.entrySet()){String datasourceGroup = entry.getKey();List<SysDbEntity> list = entry.getValue();SysDbEntity first = list.get(0);switch (first.getDatasourceType()){case 0 ->{AbstractBeanDefinition singleDataSourceBeanDef = genSingleDataSourceFactorBean(first);registry.registerBeanDefinition(DATASOURCE_PREFIX+datasourceGroup,singleDataSourceBeanDef);AbstractBeanDefinition singleSqlSessionFactorBean = genSqlSessionFactorBean(EDataSourceType.SINGLE,datasourceGroup);registry.registerBeanDefinition(SQL_SESSION_FACTORY_PREFIX+datasourceGroup,singleSqlSessionFactorBean);}case 1 ->{AbstractBeanDefinition multiDataSourceBeanDef = genMultiDataSourceFactorBean(datasourceGroup,list,mRoutingDataSourceMemo);registry.registerBeanDefinition(DATASOURCE_PREFIX+datasourceGroup,multiDataSourceBeanDef);AbstractBeanDefinition multiSqlSessionFactorBean = genSqlSessionFactorBean(EDataSourceType.MULTI,datasourceGroup);registry.registerBeanDefinition(SQL_SESSION_FACTORY_PREFIX+datasourceGroup,multiSqlSessionFactorBean);}}}}@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {mApplicationContext = applicationContext;}public AbstractBeanDefinition genSingleDataSourceFactorBean(SysDbEntity pSysDbEntity){BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SingleDataSourceFactorBean.class);builder.addConstructorArgValue(pSysDbEntity);return builder.getBeanDefinition();}public AbstractBeanDefinition genMultiDataSourceFactorBean(String pKey,List<SysDbEntity> pSysDbEntityList,RoutingDataSourceMemo pRoutingDataSourceMemo){BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MultiDataSourceFactorBean.class);builder.addConstructorArgValue(pKey);builder.addConstructorArgValue(pSysDbEntityList);builder.addConstructorArgValue(pRoutingDataSourceMemo);return builder.getBeanDefinition();}public AbstractBeanDefinition genSqlSessionFactorBean(EDataSourceType pDataSourceType, String pDataSourceName){BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SqlSessionFactorBean.class);builder.addConstructorArgValue(pDataSourceType);builder.addConstructorArgReference(DATASOURCE_PREFIX + pDataSourceName);ObjectProvider<MybatisPlusProperties> mybatisPlusPropertiesProvider = mBeanFactory.getBeanProvider(MybatisPlusProperties.class);builder.addConstructorArgValue(mybatisPlusPropertiesProvider);builder.addConstructorArgValue(mResourceLoader);builder.addConstructorArgValue(mApplicationContext);ObjectProvider<Interceptor> interceptorsProvider = mBeanFactory.getBeanProvider(Interceptor.class);builder.addConstructorArgValue(interceptorsProvider);ObjectProvider<TypeHandler> typeHandlerProvider = mBeanFactory.getBeanProvider(TypeHandler.class);builder.addConstructorArgValue(typeHandlerProvider);ObjectProvider<LanguageDriver> languageDriverProvider = mBeanFactory.getBeanProvider(LanguageDriver.class);builder.addConstructorArgValue(languageDriverProvider);ObjectProvider<DatabaseIdProvider> databaseIdProviderProvider = mBeanFactory.getBeanProvider(DatabaseIdProvider.class);builder.addConstructorArgValue(databaseIdProviderProvider);ResolvableType configurationCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, ConfigurationCustomizer.class);ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizerListProvider = mBeanFactory.getBeanProvider(configurationCustomizerTargetType);builder.addConstructorArgValue(configurationCustomizerListProvider);ResolvableType sqlSessionFactoryBeanCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, SqlSessionFactoryBeanCustomizer.class);ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizerListProvider = mBeanFactory.getBeanProvider(sqlSessionFactoryBeanCustomizerTargetType);builder.addConstructorArgValue(sqlSessionFactoryBeanCustomizerListProvider);ResolvableType mybatisPlusPropertiesCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, MybatisPlusPropertiesCustomizer.class);ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider = mBeanFactory.getBeanProvider(mybatisPlusPropertiesCustomizerTargetType);builder.addConstructorArgValue(mybatisPlusPropertiesCustomizerProvider);// 显式添加配置类的依赖//builder.addDependsOn("mybatisPlusInterceptor");return builder.getBeanDefinition();}}
3.5.4 Dadasouce的FactorBean
public class SingleDataSourceFactorBean implements FactoryBean<DataSource> {private final SysDbEntity mSysDbEntity;public SingleDataSourceFactorBean(SysDbEntity pSysDbEntity) {mSysDbEntity = pSysDbEntity;}@Overridepublic DataSource getObject() throws Exception {MysqlXADataSource dataSource = DataSourceBuilder.create().type(MysqlXADataSource.class).url(mSysDbEntity.getDbUrl()).username(mSysDbEntity.getDbUser()).password(mSysDbEntity.getDbPasswd()).build();AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName(mSysDbEntity.getDbKey());atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.XADataSource");atomikosDataSourceBean.setXaDataSource(dataSource);Optional.ofNullable(mSysDbEntity.getMinIdle()).ifPresent(atomikosDataSourceBean::setMinPoolSize);Optional.ofNullable(mSysDbEntity.getMaxPoolSize()).ifPresent(atomikosDataSourceBean::setMaxPoolSize);return atomikosDataSourceBean;}@Overridepublic Class<?> getObjectType() {return DataSource.class;}
}
public class MultiDataSourceFactorBean implements FactoryBean<DataSource> {private final String mKey;private final List<SysDbEntity> mSysDbEntityList;private final RoutingDataSourceMemo mRoutingDataSourceMemo;public MultiDataSourceFactorBean(String pKey, List<SysDbEntity> pSysDbEntityList, RoutingDataSourceMemo pRoutingDataSourceMemo) {mKey = pKey;mSysDbEntityList = pSysDbEntityList;mRoutingDataSourceMemo = pRoutingDataSourceMemo;}@Overridepublic DataSource getObject() throws Exception {Map<Object, Object> dataSourceMap = new HashMap<>();DataSource primaryDataSource = null;for (SysDbEntity sysDbEntity : mSysDbEntityList) {MysqlXADataSource dataSource = DataSourceBuilder.create().type(MysqlXADataSource.class).url(sysDbEntity.getDbUrl()).username(sysDbEntity.getDbUser()).password(sysDbEntity.getDbPasswd()).build();AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName(sysDbEntity.getDbKey());atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.XADataSource");atomikosDataSourceBean.setXaDataSource(dataSource);Optional.ofNullable(sysDbEntity.getMinIdle()).ifPresent(atomikosDataSourceBean::setMinPoolSize);Optional.ofNullable(sysDbEntity.getMaxPoolSize()).ifPresent(atomikosDataSourceBean::setMaxPoolSize);dataSourceMap.put(sysDbEntity.getDbKey(), atomikosDataSourceBean);if(sysDbEntity.getPrimary()){primaryDataSource = atomikosDataSourceBean;}}ZfRoutingDataSource dataSourceRouter = mRoutingDataSourceMemo.createRoutingDataSource(mKey,dataSourceMap,primaryDataSource);return dataSourceRouter;}@Overridepublic Class<?> getObjectType() {return DataSource.class;}
}
3.5.5 Dadasouce的FactorBean
该类需要重点说一下,mybatisplus对mybatis的拓展功能,是依赖SqlSessionFactory实现的。所以单纯的new一个MybatisSqlSessionFactoryBean ,而后getObject,将会使所有mybatis-plus的功能无法使用。我下面的这段代码,是copy并修改mybatis-plus的MybatisPlusAutoConfiguration。
public class SqlSessionFactorBean implements FactoryBean<SqlSessionFactory> {private final DataSource mDataSource;private final ObjectProvider<MybatisPlusProperties> mPropertiesProvider;private final ResourceLoader mResourceLoader;private final ApplicationContext mApplicationContext;private final EDataSourceType mDataSourceType;/* mp用到的一些变量*/private final ObjectProvider<Interceptor> mInterceptorsProvider;private final ObjectProvider<TypeHandler> mTypeHandlersProvider;private final ObjectProvider<LanguageDriver> mLanguageDriversProvider;private final ObjectProvider<DatabaseIdProvider> mDatabaseIdProviderProvider;private final ObjectProvider<List<ConfigurationCustomizer>> mConfigurationCustomizersProvider;private final ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> mSqlSessionFactoryBeanCustomizersProvider;private final ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mMybatisPlusPropertiesCustomizersProvider;private Interceptor[] mInterceptors;private TypeHandler[] mTypeHandlers;private LanguageDriver[] mLanguageDrivers;private DatabaseIdProvider mDatabaseIdProvider;private List<ConfigurationCustomizer> mConfigurationCustomizers;private List<SqlSessionFactoryBeanCustomizer> mSqlSessionFactoryBeanCustomizers;private List<MybatisPlusPropertiesCustomizer> mMybatisPlusPropertiesCustomizers;private MybatisPlusProperties mMybatisPlusProperties;public SqlSessionFactorBean(EDataSourceType pDataSourceType,DataSource pDataSource,ObjectProvider<MybatisPlusProperties> propertiesProvider,ResourceLoader resourceLoader,ApplicationContext applicationContext,ObjectProvider<Interceptor> interceptorsProvider,ObjectProvider<TypeHandler> typeHandlersProvider,ObjectProvider<LanguageDriver> languageDriversProvider,ObjectProvider<DatabaseIdProvider> databaseIdProvider,ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizers,ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider) {mDataSourceType = pDataSourceType;mDataSource = pDataSource;this.mPropertiesProvider = propertiesProvider;this.mInterceptorsProvider = interceptorsProvider;this.mTypeHandlersProvider = typeHandlersProvider;this.mLanguageDriversProvider = languageDriversProvider;this.mResourceLoader = resourceLoader;this.mDatabaseIdProviderProvider = databaseIdProvider;this.mConfigurationCustomizersProvider = configurationCustomizersProvider;this.mSqlSessionFactoryBeanCustomizersProvider = sqlSessionFactoryBeanCustomizers;this.mMybatisPlusPropertiesCustomizersProvider = mybatisPlusPropertiesCustomizerProvider;this.mApplicationContext = applicationContext;}@Overridepublic SqlSessionFactory getObject() throws Exception {getMpBeans();switch (mDataSourceType){case MULTI -> {MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();MultiDataSourceTransactionFactory transactionFactory = new MultiDataSourceTransactionFactory();sqlSessionFactory(bean,mDataSource);bean.setTransactionFactory(transactionFactory);return bean.getObject();}case SINGLE -> {MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();sqlSessionFactory(bean,mDataSource);return bean.getObject();}}return null;}@Overridepublic Class<?> getObjectType() {return null;}protected void getMpBeans(){mInterceptors = mInterceptorsProvider.stream().toArray(Interceptor[]::new);mTypeHandlers = mTypeHandlersProvider.stream().toArray(TypeHandler[]::new);mLanguageDrivers = mLanguageDriversProvider.stream().toArray(LanguageDriver[]::new);mDatabaseIdProvider = mDatabaseIdProviderProvider.getIfAvailable();mConfigurationCustomizers = mConfigurationCustomizersProvider.getIfAvailable();mSqlSessionFactoryBeanCustomizers = mSqlSessionFactoryBeanCustomizersProvider.getIfAvailable();mMybatisPlusPropertiesCustomizers = mMybatisPlusPropertiesCustomizersProvider.getIfAvailable();mMybatisPlusProperties = mPropertiesProvider.getIfAvailable();}public MybatisSqlSessionFactoryBean sqlSessionFactory(MybatisSqlSessionFactoryBean pMybatisSqlSessionFactoryBean ,DataSource dataSource) throws Exception {MybatisSqlSessionFactoryBean factory = pMybatisSqlSessionFactoryBean;factory.setDataSource(dataSource);factory.setVfs(SpringBootVFS.class);if (StringUtils.hasText(this.mMybatisPlusProperties.getConfigLocation())) {factory.setConfigLocation(this.mResourceLoader.getResource(this.mMybatisPlusProperties.getConfigLocation()));}applyConfiguration(factory);if (this.mMybatisPlusProperties.getConfigurationProperties() != null) {factory.setConfigurationProperties(this.mMybatisPlusProperties.getConfigurationProperties());}if (!ObjectUtils.isEmpty(this.mInterceptors)) {factory.setPlugins(this.mInterceptors);}if (this.mDatabaseIdProvider != null) {factory.setDatabaseIdProvider(this.mDatabaseIdProvider);}if (StringUtils.hasLength(this.mMybatisPlusProperties.getTypeAliasesPackage())) {factory.setTypeAliasesPackage(this.mMybatisPlusProperties.getTypeAliasesPackage());}if (this.mMybatisPlusProperties.getTypeAliasesSuperType() != null) {factory.setTypeAliasesSuperType(this.mMybatisPlusProperties.getTypeAliasesSuperType());}if (StringUtils.hasLength(this.mMybatisPlusProperties.getTypeHandlersPackage())) {factory.setTypeHandlersPackage(this.mMybatisPlusProperties.getTypeHandlersPackage());}if (!ObjectUtils.isEmpty(this.mTypeHandlers)) {factory.setTypeHandlers(this.mTypeHandlers);}if (!ObjectUtils.isEmpty(this.mMybatisPlusProperties.resolveMapperLocations())) {factory.setMapperLocations(this.mMybatisPlusProperties.resolveMapperLocations());}Class<? extends LanguageDriver> defaultLanguageDriver = this.mMybatisPlusProperties.getDefaultScriptingLanguageDriver();if (!ObjectUtils.isEmpty(this.mLanguageDrivers)) {factory.setScriptingLanguageDrivers(this.mLanguageDrivers);}Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);applySqlSessionFactoryBeanCustomizers(factory);GlobalConfig globalConfig = this.mMybatisPlusProperties.getGlobalConfig();this.getBeanThen(MetaObjectHandler.class, globalConfig::setMetaObjectHandler);this.getBeanThen(AnnotationHandler.class, globalConfig::setAnnotationHandler);this.getBeanThen(PostInitTableInfoHandler.class, globalConfig::setPostInitTableInfoHandler);this.getBeansThen(IKeyGenerator.class, i -> globalConfig.getDbConfig().setKeyGenerators(i));this.getBeanThen(ISqlInjector.class, globalConfig::setSqlInjector);this.getBeanThen(IdentifierGenerator.class, globalConfig::setIdentifierGenerator);factory.setGlobalConfig(globalConfig);return factory;}private <T> void getBeanThen(Class<T> clazz, Consumer<T> consumer) {if (this.mApplicationContext.getBeanNamesForType(clazz, false, false).length > 0) {consumer.accept(this.mApplicationContext.getBean(clazz));}}private <T> void getBeansThen(Class<T> clazz, Consumer<List<T>> consumer) {if (this.mApplicationContext.getBeanNamesForType(clazz, false, false).length > 0) {final Map<String, T> beansOfType = this.mApplicationContext.getBeansOfType(clazz);List<T> clazzList = new ArrayList<>();beansOfType.forEach((k, v) -> clazzList.add(v));consumer.accept(clazzList);}}private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {MybatisPlusProperties.CoreConfiguration coreConfiguration = this.mMybatisPlusProperties.getConfiguration();MybatisConfiguration configuration = null;if (coreConfiguration != null || !StringUtils.hasText(this.mMybatisPlusProperties.getConfigLocation())) {configuration = new MybatisConfiguration();}if (configuration != null && coreConfiguration != null) {coreConfiguration.applyTo(configuration);}if (configuration != null && !CollectionUtils.isEmpty(this.mConfigurationCustomizers)) {for (ConfigurationCustomizer customizer : this.mConfigurationCustomizers) {customizer.customize(configuration);}}factory.setConfiguration(configuration);}private void applySqlSessionFactoryBeanCustomizers(MybatisSqlSessionFactoryBean factory) {if (!CollectionUtils.isEmpty(this.mSqlSessionFactoryBeanCustomizers)) {for (SqlSessionFactoryBeanCustomizer customizer : this.mSqlSessionFactoryBeanCustomizers) {customizer.customize(factory);}}}
}
四、代码展示
在主类上,添加如下注解,就可以使用多数据源事务了
@EnableZfMultiDataSource
@MapperScans(value = {@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.monitor.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_monitor"),@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.busy.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_busy"),
}
)
具体代码请移步我的 码云