通过Spring框架下的@Transactional注解可以保证单一数据源增删改查的一致性。
但在分布式事务中,每个微服务都可能对应着一个独立的数据源,这个时候@Transactional注解就失效了。所以为了解决在调用多个系统服务时数据的一致性,需要使用分布式事务来解决这个问题。
Seata介绍
Seata是一个由阿里开源的解决开发中的分布式事务问题的一套分布式事务解决方案。Seata的设计目标是对业务无侵入,从对业务无侵入的2PC方案入手,在传统的2PC的基础上演化而来。Seata将一个分布式事务理解成一个包含了若干分支事务的全局事务,在全局事务的协调下保证各个分支事务的数据一致性,要么一起提交,要么一起回滚。
Seata组件
Seata主要有三个重要组件组成
- TC: Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局事务的提交和回滚
- TM: Transaction Manager 事务管理器,用于开启、提交和回滚全局事务
- RM: Resource Manager 资源管理器,用于分支事务的资源管理,向TC注册分支事务,上报分支事务的状态,上报分支事务的状态,接受TC的命令来提交和回滚事务
Seata工作流程
Seata的工作流程
- TM向TC申请开启一个全局事务,全局事务创建成功并生成全局唯一的XID
- XID在微服务调用链路的上下文中传播
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖
- TM向TC发起针对XID的全局提交或回滚决议
- TC调度XID下管辖的全部分支事务完成提交或回滚请求
Seata整体机制
2PC协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和链接资源
二阶段:
提交异步化
回滚通过一阶段的回滚日志进行反向补偿
写隔离
一阶段本地事务提交前,需要确保先拿到全局锁。拿不到全局锁就不能提交本地事务。拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。写隔离的目的是为了防止多个分布式事务脏写的问题。例如一个用户下单购买商品,订单微服务调用库存微服务进行完了增减库存的操作,正在进行创建订单的操作。而此时另一个用户也下单了相同的商品,库存微服务中此时因为已经调用结束了,是没有本地锁的,如果没有写隔离,此时第一个用户的操作遇到异常回滚,但因为此时库存的数据已经被第二个用户更改,可能会导致全局事务回滚失败
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)如果应用在特定场景下,必需要求全局的 读已提交 ,可以通过 SELECT FOR UPDATE 语句的代理。SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到全局锁拿到,即读取的相关数据是已提交的,才返回。
Seata实现2PC与传统2PC的差别
- 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
- 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率
Seata安装和配置
- 修改file.conf
## transaction log store
store {
## store mode: file、db
mode = "db"
将mode改为db
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=utf8&useSSL=false&&serverTimezone=UTC"
user = "root"
password = "123456"
更改数据库配置信息
service {
#vgroup->rgroup
vgroup_mapping.seata_tx_group = "default"
更改事务分组
- 修改registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
这里使用Nacos,填写地址和命名空间,不填为public
- 配置数据库
seata中conf文件夹中,db_store.sql在seata数据库中执行,db_undo_log.sql在业务数据库中执行。
Seata的使用
有订单微服务和库存微服务,订单微服务前端接受到数据后,通过Feign调用库存微服务减少库存,之后在数据库中创建订单。
通过使用Seata来保证创建订单和减少库存的原子性。
- pom文件中添加seata起步依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
可以将起步依赖中的seata排除,安装与本地安装seata版本相同的seata,避免出现版本问题
- 在application.properties中添加seata相关配置
# 关闭Feign降级
feign.hystrix.enabled=false
logging.level.io.seata=info
# 分组要与registry.conf相同
spring.cloud.alibaba.seata.tx-service-group=tx_group
- 将conf文件夹中配置好的file.conf和registry.conf复制到项目的resources目录下

- 新增一个Configuration
/**
* 使用Seata对数据源进行代理
*/
@Configuration
public class DataSourceProxyConfig {
@Value("classpath:mapper/*.xml")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
- 业务方法中加入@GlobalTransactional注解即可
@GlobalTransactional(name = "create-order",rollbackFor = Exception.class)
测试
@Override
@GlobalTransactional(name = "create-order",rollbackFor = Exception.class)
public String createOrder(Order order) {
Integer goodId = order.getGoodId();
String username = order.getUsername();
//数据验证
if (goodId == null) return "请输入商品id";
if (username == null) return "请输入用户名";
Integer count = gm.queryGoodCount(goodId);
if (count == null) return "商品不存在";
if (!(count > 0)) return "秒杀失败,商品已被抢完";
//通过Feign调用库存微服务更新库存
ps.updateStock(order);
//模拟异常
int i=1/0;
//创建订单
om.insertOrderInfo(order);
return "秒杀成功";
}
经过测试,如果使用@Transactional,如果在更新库存成功后发生异常,库存不会回滚,数据会不一致。
使用@GlobalTransactional注解,则可以成功回滚。