Seata搭建
1.初识Seata
Quick Start | Apache Seata 官网
2.准备nacos和 seata
启动nacos
startup.cmd -m standalone
账号nacos 密码nacos
搭建seata TC
这里下载的 1.4.2
seata-server-1.4.2
1.修改seata配置文件
registry.conf
这里我们使用nacos作为注册中心 和 配置中心
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa# 注册中心 nacostype = "nacos"# 注册中心的配置nacos {application = "seata-tc-server"serverAddr = "127.0.0.1:8848"group = "DEFAULT_GROUP"namespace = ""cluster = "default"username = "nacos"password = "nacos"}}# 配置文件
config {# file、nacos 、apollo、zk、consul、etcd3type = "nacos" #还是放nacos上面nacos {serverAddr = "127.0.0.1:8848"namespace = ""group = "SEATA_GROUP"username = "nacos"password = "nacos"dataId = "seataServer.properties"}}
store.mode=db
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.181.202:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackFailedUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none# 关闭metrics功能,提高性能
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
2.创建数据库 创建表
建表SQL
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(32),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_status` (`status`),KEY `idx_branch_id` (`branch_id`),KEY `idx_xid` (`xid`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `distributed_lock`
(`lock_key` CHAR(20) NOT NULL,`lock_value` VARCHAR(20) NOT NULL,`expire` BIGINT,primary key (`lock_key`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
3.运行
cmd 运行
seata-server.bat
win bat
mac sh
什么参数不加就是默认配置 默认端口 默认信息
nacos 注册完成
3.微服务集成Seata
比如在 pro 商品服务导入依赖
1.导入依赖
这时老版本 2.9.9 realease springboot版本 使用的 版本
<!--分布式事务--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><!--版本较低 1.3.0 因此排除--><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions></dependency><!--seata starter 采用 1.4.2版本--><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.2</version></dependency>
因为自动装配,所以只需告诉服务 TC 服务的地址 在哪里
参考官网
2.yml配置
seata:registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址#参考tc服务自己的 registry.conf 中的配置,#包括:地址 namespace, group,application-name, clustertype: nacosnacos: #TCserver-addr: 127.0.0.1:8848group: DEFAULT_GROUPnamespace: "" #因为都在public 所以用空串application: seata-tc-serverusername: "nacos"password: "nacos"tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称#把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组#方便将来做快速失败 集群容错的 管理service:vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中seata-demo: default #没有集群默认是 default nacos里 seata-demo 映射 到 default
重启
把需要做分布式事务的服务都做一遍配置
重启
4.实现分布式事务
1.XA模式
基于数据库实现的 Mysql支持 XA 先让一个不提交 等所有都没有问题 然后同时提交
失败一个回滚所有,它是基于数据库本身的实现,来阻塞提交,性能不好
1.Seata的实现
优势
强一致性
缺点
可用性降低了
2.代码实现
在每个需要全局事务管理的服务加上
seata:registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址#参考tc服务自己的 registry.conf 中的配置,#包括:地址 namespace, group,application-name, clustertype: nacosnacos: #TCserver-addr: 127.0.0.1:8848group: DEFAULT_GROUPnamespace: "" #因为都在public 所以用空串application: seata-tc-serverusername: "nacos"password: "nacos"tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称#把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组#方便将来做快速失败 集群容错的 管理service:vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中seata-demo: default #没有集群默认是 default nacos里 seata-demo 映射 到 defaultdata-source-proxy-mode: XA #开启数据源代理的XA模式
加上GlobalTransaction注解
@GetMapping("/add")@GlobalTransactional(rollbackFor = Exception.class)public R addDingdan(@RequestParam("order") String order) {SpuInfoEntity spuInfoEntity = new SpuInfoEntity();spuInfoEntity.setSpuName(order);spuInfoService.save(spuInfoEntity);CouponHistoryEntity couponHistoryEntity = new CouponHistoryEntity();couponHistoryEntity.setMemberNickName(order);couponFeignService.saveTrySeata(couponHistoryEntity);return R.ok();}
另一个服务 也一定要加事务要不然会造成死锁
@RequestMapping("/save")@Transactional(rollbackFor = Exception.class)public R save(@RequestBody CouponHistoryEntity couponHistory) throws Exception {couponHistoryService.save(couponHistory);CouponHistoryEntity couponHistory1 = new CouponHistoryEntity();BeanUtils.copyProperties(couponHistory,couponHistory1);couponHistory1.setId(1011L);couponHistoryService.save(couponHistory1);CouponHistoryEntity couponHistory2 = new CouponHistoryEntity();BeanUtils.copyProperties(couponHistory,couponHistory2);couponHistory2.setId(1012L);couponHistoryService.save(couponHistory2);CouponHistoryEntity couponHistory3 = new CouponHistoryEntity();BeanUtils.copyProperties(couponHistory,couponHistory3);couponHistory3.setId(1013L);couponHistoryService.save(couponHistory3);return R.ok();}
2.AT模式
1.AT模式原理
2.AT模式的脏写问题
全局锁的机制
3. 代码实现
上面一开始就导入了 lock_table 到 seata服务器所在的数据库
然后只需要导入 undo_log 表就可以了 每个用到分布式事务的微服务所在的数据库都必须要导入一张undo_log表
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
seata:data-source-proxy-mode: AT
重启一下就完事了,无代码侵入
测试回滚成功,和上面XA一样的代码不用动
3.TCC模式
1.TCC模式原理
2.TCC代码实现
1.先建一张 冻结表
CREATE table `pms_spu_info_tbl`( `xid` varchar(128) NOT NULL,`user_id` varchar(255) Default null comment '用户ID',`freeze_money` int unsigned default '0' comment '冻结金额',`state` int default null comment '事务状态,0:try,1:confirm,2:cancel',Primary key (`xid`) USING BTREE
) ENGINE = InnoDB Default charset=utf8;
2.try业务
记录冻结金额和事务状态到 pms_spu_info_tbl
添加金额
3.confirm业务
根据xid 删除account_freeze表的冻结记录
4.cancel业务
减少金额
根据xid 修改account_freeze回滚状态
state 2
5.如何判断是否空回滚
6.如何避免业务悬挂
3.声明TCC接口
package com.jmj.gulimall.product.tcc;import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;@LocalTCC
public interface AccountTCCService {@TwoPhaseBusinessAction(name="induct",commitMethod = "commit",rollbackMethod = "rollback")void induct(@BusinessActionContextParameter(paramName = "params") String params,@BusinessActionContextParameter(paramName = "money") int money);boolean commit(BusinessActionContext businessActionContext);boolean rollback(BusinessActionContext businessActionContext);}
package com.jmj.gulimall.product.tcc;import com.jmj.gulimall.product.dao.PmsSpuInfoTblMapper;
import com.jmj.gulimall.product.entity.SpuInfoEntity;
import com.jmj.gulimall.product.service.SpuInfoService;
import com.jmj.gulimall.product.vo.spu.PmsSpuInfoTbl;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Slf4j
@Service
public class AccountTCCServiceImpl implements AccountTCCService {@Autowiredprivate SpuInfoService spuInfoService;@Autowiredprivate PmsSpuInfoTblMapper pmsSpuInfoTblMapper;@Override@Transactional(rollbackFor = Exception.class)public void induct(String params, int money) {//0.获取全局事务IDString xid = RootContext.getXID();PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);if (pmsSpuInfoTbl != null) {return ;}//1.扣减可用余额SpuInfoEntity byId = spuInfoService.getById(9L);byId.setPublishStatus(byId.getPublishStatus() + money);spuInfoService.updateById(byId);//2.记录冻结金额 事务状态PmsSpuInfoTbl p = new PmsSpuInfoTbl();p.setUserId("123");p.setFreezeMoney(money);p.setState(0);p.setXid(xid);pmsSpuInfoTblMapper.insert(p);}@Overridepublic boolean commit(BusinessActionContext businessActionContext) {//0.获取全局事务IDString xid = businessActionContext.getXid();//1.删除冻结金额return pmsSpuInfoTblMapper.deleteById(xid) == 1;}@Overridepublic boolean rollback(BusinessActionContext businessActionContext) {//0.获取全局事务IDString xid = businessActionContext.getXid();PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);if (pmsSpuInfoTbl != null) {if (pmsSpuInfoTbl.getState() ==2) {//幂等操作return true;}Integer freezeMoney = pmsSpuInfoTbl.getFreezeMoney();//1.扣减可用余额SpuInfoEntity byId = spuInfoService.getById(9L);byId.setPublishStatus(byId.getPublishStatus() - freezeMoney);spuInfoService.updateById(byId);pmsSpuInfoTbl.setState(2);pmsSpuInfoTblMapper.updateById(pmsSpuInfoTbl);}else {PmsSpuInfoTbl pmsSpuInfoTbl1 = new PmsSpuInfoTbl();String params = businessActionContext.getActionContext("params").toString();pmsSpuInfoTbl1.setUserId(params);pmsSpuInfoTbl1.setState(2);pmsSpuInfoTbl1.setFreezeMoney(0);pmsSpuInfoTbl1.setXid(xid);pmsSpuInfoTblMapper.insert(pmsSpuInfoTbl1);}return true;}
}
@GetMapping("/testTcc")public R testTcc() {accountTCCService.induct("JMJ",12);return R.ok();}
package com.jmj.gulimall.coupon.controller;import com.jmj.common.utils.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;@FeignClient(value = "gulimall-product",path = "/product/spuinfo")
public interface PmsFeignService {@GetMapping("/testTcc")public R testTcc();
}
@GetMapping("/tet/tcc")@GlobalTransactional(rollbackFor = Exception.class)public R tcc(){CouponEntity couponEntity = new CouponEntity();couponEntity.setCouponImg(UUID.randomUUID().toString());couponEntity.setCouponName(UUID.randomUUID().toString());couponService.save(couponEntity);pmsFeignService.testTcc();if (true){throw new RuntimeException("测试异常");}return R.ok();}
最终回滚
能和AT模式混用
4.SAGA模式
类似于工作流
无隔离性
5.SEATA高可用
1.实现微服务高可用和异地容灾
本地模拟复制一份 文件夹
把配置文件改成杭州集群
启动命令
seata-server.bat -p 8092
这样集群部署成功了
若出现容灾的情况下
利用nacos的配置热更新 就可以了
可以新建一个通用客户端配置
client.yaml
官网有详细配置
这里用我自己的简略版
#事务组的映射关系
service.vgroupMapping.seata-demo=defaultservice.enableDegrade=false
service.disableGlobalTransaction=false#与TC服务的通信配置transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none#rm配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
#tm配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
#undo配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
自动刷新 注解这里记录一下
/*** 优惠券信息** @author jiangmingji* @email 123456789@qq.com* @date 2024-03-21 21:24:36*/
@RefreshScope //动态从配置中心获取配置
@RestController
@RequestMapping("/coupon/coupon")
public class CouponController {@Autowiredprivate CouponService couponService;@Value("${coupon.user.name}")private String name;@Value("${coupon.user.age}")private Integer age;
微服务读取配置
这样给微服务 配上配置就可以了
seata:data-source-proxy-mode: AT #开启数据源代理的XA模式registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址#参考tc服务自己的 registry.conf 中的配置,#包括:地址 namespace, group,application-name, clustertype: nacosnacos: #TCserver-addr: 127.0.0.1:8848group: DEFAULT_GROUPnamespace: "" #因为都在public 所以用空串application: seata-tc-serverusername: "nacos"password: "nacos"tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称config:type: nacosnacos:server-addr: 127.0.0.1:8848group: 'SEATA_GROUP'dataId: 'client.properties'username: 'nacos'password: 'nacos'namespace: ""
重启服务
修改配置
将集群切换为杭州 HZ
成功注册到8092那个seata-server上面了