Seata源码—7.Seata TCC模式的事务处理一
大纲
1.Seata TCC分布式事务案例配置
2.Seata TCC案例服务提供者启动分析
3.@TwoPhaseBusinessAction注解扫描源码
4.Seata TCC案例分布式事务入口分析
5.TCC核心注解扫描与代理创建入口源码
6.TCC动态代理拦截器TccActionInterceptor
7.Action拦截处理器ActionInterceptorHandler
8.Seata TCC分布式事务的注册提交回滚处理源码
1.Seata TCC分布式事务案例配置
(1)位于seata-samples的tcc模块下的Demo工程
(2)Demo工程的配置文件
(3)Demo工程运行说明
(1)位于seata-samples的tcc模块下的Demo工程
dubbo-tcc-sample模块主要演示了TCC模式下分布式事务的提交和回滚。该Demo中一个分布式事务内会有两个TCC事务参与者,这两个TCC事务参与者分别是TccActionOne和TccActionTwo。分布式事务提交则两者均提交,分布式事务回滚则两者均回滚。
这两个TCC事务参与者均是Dubbo远程服务。一个应用作为服务提供方,会实现这两个TCC参与者,并将它们发布成Dubbo服务。另外一个应用作为事务发起方,会订阅Dubbo服务,然后调用编排TCC参与者,执行远程Dubbo服务。
TccActionOne接口定义如下:
public interface TccActionOne {@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") int a);public boolean commit(BusinessActionContext actionContext);public boolean rollback(BusinessActionContext actionContext);
}
TccActionTwo接口定义如下:
public interface TccActionTwo {@TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")public boolean prepare(BusinessActionContext actionContext,@BusinessActionContextParameter(paramName = "b") String b,@BusinessActionContextParameter(paramName = "c", index = 1) List list);public boolean commit(BusinessActionContext actionContext);public boolean rollback(BusinessActionContext actionContext);
}
(2)Demo工程的配置文件
一.seata-tcc.xml
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"default-autowire="byName"><!-- fescar bean scanner --><bean class="io.seata.spring.annotation.GlobalTransactionScanner"><constructor-arg value="tcc-sample"/><constructor-arg value="my_test_tx_group"/></bean><bean id="tccActionOneImpl" class="io.seata.samples.tcc.dubbo.action.impl.TccActionOneImpl"/><bean id="tccActionTwoImpl" class="io.seata.samples.tcc.dubbo.action.impl.TccActionTwoImpl"/><bean id="tccTransactionService" class="io.seata.samples.tcc.dubbo.service.TccTransactionService"/>
</beans>
二.seata-dubbo-provider.xml
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName"><dubbo:application name="tcc-sample"><dubbo:parameter key="qos.enable" value="false"/></dubbo:application><!--使用 zookeeper 注册中心暴露服务,注意要先开启 zookeeper--><dubbo:registry address="zookeeper://127.0.0.1:2181"/><dubbo:protocol name="dubbo" port="-1"/><dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/><!-- 第一个TCC 参与者服务发布 --><dubbo:service interface="io.seata.samples.tcc.dubbo.action.TccActionOne" ref="tccActionOneImpl"/><!-- 第二个TCC 参与者服务发布 --><dubbo:service interface="io.seata.samples.tcc.dubbo.action.TccActionTwo" ref="tccActionTwoImpl"/>
</beans>
三.seata-dubbo-reference.xml
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName"><dubbo:application name="tcc-sample-reference"><dubbo:parameter key="qos.enable" value="false"/></dubbo:application><!--使用 zookeeper 注册中心暴露服务,注意要先开启 zookeeper--><dubbo:registry address="zookeeper://127.0.0.1:2181"/><dubbo:protocol name="dubbo" port="-1"/><dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/><!-- 第一个TCC参与者 服务订阅 --><dubbo:reference id="tccActionOne" interface="io.seata.samples.tcc.dubbo.action.TccActionOne" check="false" lazy="true"/><!-- 第二个TCC参与者 服务订阅 --><dubbo:reference id="tccActionTwo" interface="io.seata.samples.tcc.dubbo.action.TccActionTwo" check="false" lazy="true"/>
</beans>
(3)Demo工程运行指南
一.启动Seata Server
二.启动Dubbo服务应用
运行DubboTccProviderStarter。该应用会发布Dubbo服务,并且实现了两个TCC参与者。
public class TccProviderStarter extends AbstractStarter {public static void main(String[] args) throws Exception {new TccProviderStarter().start0(args);}@Overrideprotected void start0(String[] args) throws Exception {ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[]{"spring/seata-tcc.xml", "spring/seata-dubbo-provider.xml"});new ApplicationKeeper().keep();}
}public class TccActionOneImpl implements TccActionOne {@Overridepublic boolean prepare(BusinessActionContext actionContext, int a) {String xid = actionContext.getXid();System.out.println("TccActionOne prepare, xid:" + xid + ", a:" + a);return true;}@Overridepublic boolean commit(BusinessActionContext actionContext) {String xid = actionContext.getXid();System.out.println("TccActionOne commit, xid:" + xid + ", a:" + actionContext.getActionContext("a"));ResultHolder.setActionOneResult(xid, "T");return true;}@Overridepublic boolean rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();System.out.println("TccActionOne rollback, xid:" + xid + ", a:" + actionContext.getActionContext("a"));ResultHolder.setActionOneResult(xid, "R");return true;}
}public class TccActionTwoImpl implements TccActionTwo {@Overridepublic boolean prepare(BusinessActionContext actionContext, String b, List list) {String xid = actionContext.getXid();System.out.println("TccActionTwo prepare, xid:" + xid + ", b:" + b + ", c:" + list.get(1));return true;}@Overridepublic boolean commit(BusinessActionContext actionContext) {String xid = actionContext.getXid();System.out.println("TccActionTwo commit, xid:" + xid + ", b:" + actionContext.getActionContext("b") + ", c:" + actionContext.getActionContext("c"));ResultHolder.setActionTwoResult(xid, "T");return true;}@Overridepublic boolean rollback(BusinessActionContext actionContext) {String xid = actionContext.getXid();System.out.println("TccActionTwo rollback, xid:" + xid + ", b:" + actionContext.getActionContext("b") + ", c:" + actionContext.getActionContext("c"));ResultHolder.setActionTwoResult(xid, "R");return true;}
}
三.启动事务应用
运行TccConsumerStarter。该应用会订阅Dubbo服务,发起分布式事务,调用上述两个TCC参与者,内含TCC事务提交场景和TCC事务回滚场景的演示。
public class TccConsumerStarter extends AbstractStarter {static TccTransactionService tccTransactionService = null;public static void main(String[] args) throws Exception {new TccConsumerStarter().start0(args);}@Overrideprotected void start0(String[] args) throws Exception {ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[]{"spring/seata-tcc.xml", "spring/seata-dubbo-reference.xml"});tccTransactionService = (TccTransactionService) applicationContext.getBean("tccTransactionService");//分布式事务提交demotransactionCommitDemo();//分布式事务回滚demotransactionRollbackDemo();}private static void transactionCommitDemo() throws InterruptedException {String txId = tccTransactionService.doTransactionCommit();System.out.println(txId);Assert.isTrue(StringUtils.isNotEmpty(txId), "事务开启失败");System.out.println("transaction commit demo finish.");}private static void transactionRollbackDemo() throws InterruptedException {Map map = new HashMap(16);try {tccTransactionService.doTransactionRollback(map);Assert.isTrue(false, "分布式事务未回滚");} catch (Throwable t) {Assert.isTrue(true, "分布式事务异常回滚");}String txId = (String) map.get("xid");System.out.println(txId);System.out.println("transaction rollback demo finish.");}
}public class TccTransactionService {private TccActionOne tccActionOne;private TccActionTwo tccActionTwo;//提交分布式事务@GlobalTransactionalpublic String doTransactionCommit() {//第一个TCC事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");//第二个TCC事务参与者result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}return RootContext.getXID();}//回滚分布式事务@GlobalTransactionalpublic String doTransactionRollback(Map map) {//第一个TCC事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");//第二个TCC事务参与者result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}map.put("xid", RootContext.getXID());throw new RuntimeException("transacton rollback");}public void setTccActionOne(TccActionOne tccActionOne) {this.tccActionOne = tccActionOne;}public void setTccActionTwo(TccActionTwo tccActionTwo) {this.tccActionTwo = tccActionTwo;}
}
2.Seata TCC案例服务提供者启动分析
添加了@TwoPhaseBusinessAction注解的接口发布成Dubbo服务:
3.@TwoPhaseBusinessAction注解扫描源码
(1)全局事务注解扫描器的wrapIfNecessary()方法扫描Spring Bean
(2)TCCBeanParserUtils的isTccAutoProxy()方法判断是否要创建TCC动态代理
(1)全局事务注解扫描器的wrapIfNecessary()方法扫描Spring Bean
全局事务注解扫描器GlobalTransactionScanner会在调用initClient()方法初始化Seata Client客户端后,通过wrapIfNecessary()方法扫描Spring Bean中含有@TwoPhaseBusinessAction注解的方法。
//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来感知Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//InitializingBean接口的回调方法//Spring容器启动和初始化完毕后,会调用如下的afterPropertiesSet()方法进行回调@Overridepublic void afterPropertiesSet() {//是否禁用了全局事务,默认是falseif (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);return;}//通过CAS操作确保initClient()初始化动作仅仅执行一次if (initialized.compareAndSet(false, true)) {//initClient()方法会对Seata Client进行初始化,比如和Seata Server建立长连接//seata-samples的tcc模块的seata-tcc.xml配置文件里都配置了GlobalTransactionScanner这个Bean//而GlobalTransactionScanner这个Bean伴随着Spring容器的初始化完毕,都会回调其初始化逻辑initClient()initClient();}}//initClient()是核心方法,负责对Seata Client客户端进行初始化private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {LOGGER.warn("...", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//对于Seata Client来说,最重要的组件有两个://一个是TM,即Transaction Manager,用来管理全局事务//一个是RM,即Resource Manager,用来管理各分支事务的数据源//init TM//TMClient.init()会对客户端的TM全局事务管理器进行初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//init RM//RMClient.init()会对客户端的RM分支事务资源管理器进行初始化RMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}//注册Spring容器被销毁时的回调钩子,释放TM和RM两个组件的一些资源registerSpringShutdownHook();}//The following will be scanned, and added corresponding interceptor://添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截//TM://@see io.seata.spring.annotation.GlobalTransactional // TM annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler//GlobalLock://@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler//TCC mode://@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface//@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method//@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser//Corresponding interceptor://@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到TccActionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {//Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...
}
(2)TCCBeanParserUtils的isTccAutoProxy()方法判断是否要创建TCC动态代理
public class TCCBeanParserUtils {private TCCBeanParserUtils() {}//is auto proxy TCC beanpublic static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);//get RemotingBean descriptionRemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);//is remoting beanif (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {//LocalTCC//创建一个local tcc代理return isTccProxyTargetBean(remotingDesc);} else {//sofa:reference / dubbo:reference, factory beanreturn false;}} else {if (remotingDesc == null) {//check FactoryBeanif (isRemotingFactoryBean(bean, beanName, applicationContext)) {remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc);} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}}...//is TCC proxy-bean/target-bean: LocalTCC , the proxy bean of sofa:reference/dubbo:referencepublic static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}//check if it is TCC beanboolean isTccClazz = false;//针对我们的class拿到一个接口classClass<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();//获取我们的接口里定义的所有的方法Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;//遍历所有的方法for (Method method : methods) {//获取的方法是否加了@TwoPhaseBusinessAction注解twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();//LocalTCCif (Protocols.IN_JVM == protocols) {//in jvm TCC bean , AOPreturn true;}//sofa:reference / dubbo:reference, AOPreturn remotingDesc.isReference();}...
}
4.Seata TCC案例分布式事务入口分析
TccTransactionService作为分布式事务的入口,其提交事务和回滚事务的接口都会被添加上@GlobalTransactional注解。
所以应用启动时,TccTransactionService的Bean就会被GlobalTransactionScanner扫描,然后其下添加了@GlobalTransactional注解的接口就会被创建动态代理。
在TccTransactionService的提交分布式事务的接口中,会先后调用TccActionOne和TccActionTwo两个Dubbo服务。并且在调用两个Dubbo服务时,会通过ApacheDubboTransactionPropagationFilter传递xid。
public class TccTransactionService {private TccActionOne tccActionOne;private TccActionTwo tccActionTwo;//提交分布式事务@GlobalTransactionalpublic String doTransactionCommit() {//第一个TCC事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");//第二个TCC事务参与者result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}return RootContext.getXID();}//回滚分布式事务@GlobalTransactionalpublic String doTransactionRollback(Map map) {//第一个TCC事务参与者boolean result = tccActionOne.prepare(null, 1);if (!result) {throw new RuntimeException("TccActionOne failed.");}List list = new ArrayList();list.add("c1");list.add("c2");//第二个TCC事务参与者result = tccActionTwo.prepare(null, "two", list);if (!result) {throw new RuntimeException("TccActionTwo failed.");}map.put("xid", RootContext.getXID());throw new RuntimeException("transacton rollback");}public void setTccActionOne(TccActionOne tccActionOne) {this.tccActionOne = tccActionOne;}public void setTccActionTwo(TccActionTwo tccActionTwo) {this.tccActionTwo = tccActionTwo;}
}