当前位置: 首页 > news >正文

Seata源码——TCC模式解析02

初始化

在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor

GlobalTransactionScanner

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 注册RM、判断是否需要代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 代理逻辑TccActionInterceptorinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }// ...
}

TCC下的Bean类型

TCC模式下有三种特殊的SpringBean。
1.LocalTCC注释接口的Bean:如案例中的LocalTccAction;
2.RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
3.RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;
在这里插入图片描述

判断是否需要代理

TCCBeanParserUtils

public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {// dubbo:service 和 LocalTCC 注册为 RMboolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);if (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {// LocalTCC 需要被代理 TccActionInterceptorreturn isTccProxyTargetBean(remotingDesc); // 1} else {// dubbo:service(ServiceBean) 不需要被代理return false; // 2}} else {if (remotingDesc == null) {if (isRemotingFactoryBean(bean, beanName, applicationContext)) {// dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptorremotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc); // 3} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}
}

isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。

public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}boolean isTccClazz = false;Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;for (Method method : methods) {twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();if (Protocols.IN_JVM == protocols) {return true; // local tcc}return remotingDesc.isReference(); // dubbo:reference
}

注册为RM

识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。

TCCBeanParserUtils

    protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);if (remotingParser != null) {return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;}return false;}

DefaultRemotingParser

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {// localTcc or ServiceBeantry {Object targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);// 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resourceif (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));//注册到TCDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {remotingBeanDesc.setReference(true);}return remotingBeanDesc;
}

一阶段(Try)

TccActionInterceptor会拦截所有标注了TwoPhaseBusinessAction注解的方法执行invoke方法执行一阶段处理

一阶段其实就做了三件事
1.创建BusinessContext
2.将BusinessContext添加到上下文中让后续的Commit和Rollback能拿到数据
3.创建分支事务

// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {return invocation.proceed();}Method method = getActionInterfaceMethod(invocation);TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (businessAction != null) {String xid = RootContext.getXID();BranchType previousBranchType = RootContext.getBranchType();if (BranchType.TCC != previousBranchType) {RootContext.bindBranchType(BranchType.TCC);}try {return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,invocation::proceed);} finally {if (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}MDC.remove(RootContext.MDC_KEY_BRANCH_ID);}}return invocation.proceed();
}

ActionInterceptorHandler

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<>(4);//TCC nameString actionName = businessAction.name();//创建BusinessActionContext BusinessActionContext actionContext = new BusinessActionContext();//设置全局事务idactionContext.setXid(xid);//设置事务唯一名称 这里是从@TwoPhaseBusinessAction注解里面的name拿过来的actionContext.setActionName(actionName);//注册分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);//设置分支事务idactionContext.setBranchId(branchId);//MDC put branchIdMDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);//设置BusinessActionContext属性信息Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//执行业务方法,即try方法ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}

BusinessActionContext 信息

public class BusinessActionContext implements Serializable {private static final long serialVersionUID = 6539226288677737991L;// 全局事务idprivate String xid;// 分支事务idprivate String branchId;// @TwoPhaseBusinessAction.nameprivate String actionName;// actionContextprivate Map<String, Object> actionContext;} 

actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。

注册分支事务

    protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//获取actionContext信息Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//初始化 BusinessContextinitBusinessContext(context, method, businessAction);//初始化上下文initFrameworkContext(context);//设置上下文信息actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}

初始化BusinessContext

将TwoPhaseBusinessAction 注解的参数放入上下文中

    protected void initBusinessContext(Map<String, Object> context, Method method,TwoPhaseBusinessAction businessAction) {if (method != null) {//the phase one method namecontext.put(Constants.PREPARE_METHOD, method.getName());}if (businessAction != null) {//the phase two method namecontext.put(Constants.COMMIT_METHOD, businessAction.commitMethod());context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());context.put(Constants.ACTION_NAME, businessAction.name());}}

初始化上下文

将本地IP放入上下文中

    protected void initFrameworkContext(Map<String, Object> context) {try {context.put(Constants.HOST_NAME, NetUtil.getLocalIp());} catch (Throwable t) {LOGGER.warn("getLocalIP error", t);}}

注册分支事务

RM进行分支事务的注册

RM进行分支事务的注册
AbstractResourceManager

    @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}
TC处理分支事务注册请求

AbstractCore

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// Step1 根据xid查询global_table得到GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);// 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行// 对于存储模式=db/redis的情况,不需要获取锁return SessionHolder.lockAndExecute(globalSession, () -> {// 状态校验 必须为beginglobalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));// Step2 获取全局锁(只有AT模式需要)branchSessionLock(globalSession, branchSession);try {// Step3 保存分支事务globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}return branchSession.getBranchId();});
}

二阶段Commit

TM发起全局事务提交

当Try处理完之后 TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交 这里在AT模式里面讲过 不知道的回看

TC处理二阶段提交

public GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ?globalSession.closeAndClean();// 如果分支事务都是AT模式,则可以执行异步提交if (globalSession.canBeCommittedAsync()) {// 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {// TCCglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) { // 同步提交(TCC)boolean success = doGlobalCommit(globalSession, false);if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else { // 异步提交(AT)return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}
}

1.执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。
2.因为这个方法也是AT模式用的 所以假如都是AT模式会被异步调用最后是释放锁 删除分支事务 删除undo_log 删除全局事务 自此提交结束 这个在前面讲过
3.假如是AT和TCC混合使用AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交,如果中间存在分支事务提交失败,会异步重试直至成功。

DefaultCore

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}try {// Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:// Step2 删除branch_table中的分支事务记录SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现)return false;default:if (!retrying) {// 更新全局事务为二阶段提交重试状态,异步重试至成功位置globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {return CONTINUE;} else {return false;}}} catch (Exception ex) {if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}// 某个分支事务处理失败,继续处理后续分支事务return CONTINUE;});// 如果是同步提交,某个分支事务处理失败,直接返回falseif (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {return false;}if (!retrying) {globalSession.setStatus(GlobalStatus.Committed);}}if (success && globalSession.getBranchSessions().isEmpty()) {// Step3 删除全局事务 delete from global_table where xid = ?SessionHelper.endCommitted(globalSession, retrying);}return success;
}

向RM发送分支事务提交请求

AbstractCore

protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();
}// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {// 定位客户端ChannelChannel channel = ChannelManager.getChannel(resourceId, clientId);if (channel == null) {throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

获取客户端Channel

对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。
对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。

ChannelManager

public static Channel getChannel(String resourceId, String clientId) {Channel resultChannel = null;String[] clientIdInfo = readClientId(clientId);String targetApplicationId = clientIdInfo[0];String targetIP = clientIdInfo[1];int targetPort = Integer.parseInt(clientIdInfo[2]);ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);// Step1 根据resourceId找对应applicationId-ip-port对应channelif (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {return null;}ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);// Step2 根据BranchSession注册的applicationId应用if (ipMap != null && !ipMap.isEmpty()) {// Step3 根据BranchSession注册的ipConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {// Step4 根据BranchSession注册的portRpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);if (exactRpcContext != null) {Channel channel = exactRpcContext.getChannel();if (channel.isActive()) {resultChannel = channel;}}// Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip)if (resultChannel == null) {for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {Channel channel = portMapOnTargetIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }}}// Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channelif (resultChannel == null) {for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {if (ipMapEntry.getKey().equals(targetIP)) { continue; }ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {continue;}for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {Channel channel = portMapOnOtherIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }if (resultChannel != null) { break; }}}}// Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channelif (resultChannel == null) {resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);}return resultChannel;}

分支事务提交请求

AbstractNettyRemotingServer

 ......return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

RM处理分支事务提交

TCCResourceManager

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {// Step2 反序列化BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析commit方法入参列表Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行commit方法 也就相当于执行到了业务指定的commit方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {// Step4-1 开启useTCCFencetry {result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {// Step4-2 未开启useTCCFenceret = commitMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}//如果处理正常返回二阶段已提交 如果异常返回分支事务二阶段提交失败重试return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_CommitFailed_Retryable;}
}

二阶段回滚

TM发起二阶段回滚请求

这个和AT那里的差不多 当TCC里面的try业务代码异常会触发二阶段的回滚

TC处理二阶段回滚请求

DefaultCore

public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close();if (globalSession.getStatus() == GlobalStatus.Begin) {// 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 执行全局回滚boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}

DefaultCore

public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// Step1 发送BranchRollbackRequestBranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:// Step2-1 释放全局锁,删除分支事务SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功SessionHelper.endRollbackFailed(globalSession, retrying);return false;default:// Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {if (!retrying) {// 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// 如果存在一个分支事务回滚失败,则返回falseif (result != null) {return result;}// Step3// 对于file模式,直接删除全局事务// 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理//  防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据//  导致全局锁一直被占用,无法释放if (success) {SessionHelper.endRollbacked(globalSession, retrying);}return success;
}

SessionHelper

public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {// 如果是重试 或 file模式if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();GlobalStatus currentStatus = globalSession.getStatus();boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 删除全局事务global_tableglobalSession.end();}
}

RM处理分支事务回滚

TCCResourceManager

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method rollbackMethod = tccResource.getRollbackMethod();try {// Step2 反序列化BusinessActionContext//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析rollback方法入参列表Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行rollback方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {try {result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,args, tccResource.getActionName());} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {ret = rollbackMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}
}
http://www.lryc.cn/news/266276.html

相关文章:

  • 缓存-Redis
  • PADS Layout安全间距检查报错
  • ebpf基础篇(二) ----- ebpf前世今生
  • 我的一天:追求专业成长与生活平衡
  • 【动态规划】斐波那契数列模型
  • 机器人运动学分析与动力学分析主要作用
  • 【Java 基础】33 JDBC
  • Unity中Shader缩放矩阵
  • Nessus详细安装-windows (保姆级教程)
  • Stream流的简单使用
  • 智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • vue和react diff的详解和不同
  • 智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • 10:IIC通信
  • 互联网上门洗衣洗鞋小程序优势有哪些?
  • Java中如何优雅地根治null值引起的Bug问题
  • C# WPF上位机开发(子窗口通知父窗口更新进度)
  • XUbuntu22.04之跨平台容器格式工具:MKVToolNix(二百零三)
  • vue中的生命周期和VueComponent实例对象
  • Hooked协议掀起WEB3新浪潮
  • 【图文教程】windows 下 MongoDB 介绍下载安装配置
  • 算法复杂度-BigO表示法
  • 测试理论知识五:功能测试、系统测试、验收测试、安装测试、测试的计划与控制
  • 太阳能爆闪警示灯
  • 怎么为pdf文件添加水印?
  • 基于ssm医药信息管理系统论文
  • Ceph存储体系架构?
  • 详解现实世界资产(RWAs)
  • Windows漏洞利用开发——利用ROP绕过DEP保护
  • 合并两个有序链表算法(leetcode第21题)