注意:您需要自行部署后端环境依赖,并修改示例中的服务依赖地址即可使用。
在开始该demo之前先完成《使用 SOFAStack 快速构建微服务》,如果没有完成,可以基于仓库里的kc-sofastack-demo工程为基线完成下面的demo,该demo是在它基础上加上Seata分布式事务。但该demo不是只能应用于SOFA,可以适用于任何java技术栈应用。
将下面的依赖引入到父工程的pom文件中(kc-sofastack-demo/pom.xml):
...
<properties>
...
<seata.version>0.6.1</seata.version>
<netty4.version>4.1.24.Final</netty4.version>
</properties>
...
<dependencyManagement>
<dependencies>
...
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-server</artifactId>
<version>${seata.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty4.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
将下面的依赖引入到 stock-mng 工程的pom文件中(kc-sofastack-demo/stock-mng/pom.xml):
<dependencies>
....
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependencies>
将下面的依赖引入到 balance-mng-impl 工程的pom文件中(kc-sofastack-demo/balance-mng/balance-mng-impl/pom.xml):
<dependencies>
....
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-server</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependencies>
将下面的java代码段加到 BalanceMngApplication 和 StockMngApplication 类的main方法下面:
...
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;
...
public static void main(String[] args) {
SpringApplication.run(BalanceMngApplication.class, args);
}
@Configuration
public static class DataSourceConfig {
@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public DataSource dataSource(DataSourceProperties properties) {
HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);
if (properties.getName()!=null && properties.getName().length() > 0) {
dataSource.setPoolName(properties.getName());
}
return new DataSourceProxy(dataSource);
}
@SuppressWarnings("unchecked")
protected static <T> T createDataSource(DataSourceProperties properties,
Class<? extends DataSource> type) {
return (T) properties.initializeDataSourceBuilder().type(type).build();
}
@Bean
@Primary
public GlobalTransactionScanner globalTransactionScanner(){
return new GlobalTransactionScanner("kc-balance-mng", "my_test_tx_group");
}
}
注意上面的dataSource方法返回的是DataSourceProxy代理的数据源
在BookStoreControllerImpl类的purchase方法上加入@GlobalTransactional注解:
...
import io.seata.spring.annotation.GlobalTransactional;
...
@Override
@GlobalTransactional(timeoutMills = 300000, name = "kc-book-store-tx")
public Success purchase(String body) {
...
}
简单起见,将Seata server和BalanceMngApplication一起启动,在BalanceMngApplication类中加入启动Seata server的代码:
...
public static void main(String[] args) {
startSeatServer();
SpringApplication.run(BalanceMngApplication.class, args);
}
/**
* The seata server.
*/
static Server server = null;
private static void startSeatServer(){
new Thread(new Runnable() {
public void run() {
server = new Server();
try {
server.main(new String[] {"8091", StoreMode.FILE.name(), "127.0.0.1"});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}
...
演示的Seata server使用本地文件作为存储,将下面两个文件复制到balance-mng-bootstrap和stock-mng工程的/src/main/resources目录下:
文件名:file.conf
文件内容:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "file_store/seata"
}
}
文件名:registry.conf
文件内容:
registry {
# file 、nacos 、eureka、redis、zk
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk
type = "file"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
apollo {
app.id = "fescar-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
file {
name = "file.conf"
}
}
在balance_db和stock_db两个数据库中都创建undo_log表:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
- 运行BalanceMngApplication类的main方法(包含启动Seata server)
- 运行StockMngApplication类的main方法
- 浏览器打开 http://localhost:8080/index.html
见上文AT模式的1、引入maven依赖
- 在balance-mng-facade工程的pom文件引入依赖(kc-sofastack-demo/balance-mng/balance-mng-facade/pom.xml):
...
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</dependency>
</dependencies>
- 在BalanceMngFacade接口增加三个方法:
...
@TwoPhaseBusinessAction(name = "minusBalancePrepare", commitMethod = "minusBalanceCommit", rollbackMethod = "minusBalanceRollback")
boolean minusBalancePrepare(BusinessActionContext context,
@BusinessActionContextParameter(paramName = "userName") String userName,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount);
boolean minusBalanceCommit(BusinessActionContext context);
boolean minusBalanceRollback(BusinessActionContext context);
- 在BalanceMngMapper接口中实现上面三个接口需要用的sql:
...
@Update("update balance_tb set balance = balance - #{amount}, freezed = freezed + #{amount} where user_name = #{userName}")
int minusBalancePrepare(@Param("userName") String userName, @Param("amount") BigDecimal amount);
@Update("update balance_tb set freezed = freezed - #{amount} where user_name = #{userName}")
int minusBalanceCommit(@Param("userName") String userName, @Param("amount") BigDecimal amount);
@Update("update balance_tb set balance = balance + #{amount}, freezed = freezed - #{amount} where user_name = #{userName}")
int minusBalanceRollback(@Param("userName") String userName, @Param("amount") BigDecimal amount);
- 修改balance_tb的表结构,增加freezed(冻结金额)字段:
ALTER TABLE balance_tb add column freezed decimal(10,2) default 0.00;
- 在BalanceMngImpl类中实现BalanceMngFacade接口中增加的三个方法:
...
private static final Logger LOGGER = LoggerFactory.getLogger(BalanceMngImpl.class);
@Override
public boolean minusBalancePrepare(BusinessActionContext context, String userName, BigDecimal amount) {
LOGGER.info("minus balance prepare begin ...");
LOGGER.info("minus balance prepare SQL: update balance_tb set balance = balance - {}, freezed = freezed + {} where user_name = {}", amount, amount, userName);
int effect = balanceMngMapper.minusBalancePrepare(userName, amount);
LOGGER.info("minus balance prepare end");
return (effect > 0);
}
@Override
public boolean minusBalanceCommit(BusinessActionContext context) {
//分布式事务ID
final String xid = context.getXid();
final String userName = String.valueOf(context.getActionContext("userName"));
final BigDecimal amount = new BigDecimal(String.valueOf(context.getActionContext("amount")));
LOGGER.info("minus balance commit begin ... xid: " + xid);
LOGGER.info("minus balance commit SQL: update balance_tb set freezed = freezed - {} where user_name = {}", amount, userName);
int effect = balanceMngMapper.minusBalanceCommit(userName, amount);
LOGGER.info("minus balance commit end");
return (effect > 0);
}
@Override
public boolean minusBalanceRollback(BusinessActionContext context) {
//分布式事务ID
final String xid = context.getXid();
final String userName = String.valueOf(context.getActionContext("userName"));
final BigDecimal amount = new BigDecimal(String.valueOf(context.getActionContext("amount")));
LOGGER.info("minus balance rollback begin ... xid: " + xid);
LOGGER.info("minus balance rollback SQL: update balance_tb set balance = balance + {}, freezed = freezed - {} where user_name = {}", amount, amount, userName);
int effect = balanceMngMapper.minusBalanceRollback(userName, amount);
LOGGER.info("minus balance rollback end");
return (effect > 0);
}
TCC模式不需要代理数据源,因为不需要解析sql,生成undo log,在BalanceMngApplication类中注释掉dataSource和createDataSource方法:
...
@Configuration
public static class DataSourceConfig {
//@Bean
//@Primary
//@ConfigurationProperties(prefix = "spring.datasource.hikari")
//public DataSource dataSource(DataSourceProperties properties) {
// HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);
// if (StringUtils.hasText(properties.getName())) {
// dataSource.setPoolName(properties.getName());
// }
// return new DataSourceProxy(dataSource);
//}
//
//@SuppressWarnings("unchecked")
//protected static <T> T createDataSource(DataSourceProperties properties,
// Class<? extends DataSource> type) {
// return (T) properties.initializeDataSourceBuilder().type(type).build();
//}
@Bean
@Primary
public GlobalTransactionScanner globalTransactionScanner(){
return new GlobalTransactionScanner("kc-balance-mng", "my_test_tx_group");
}
}
@Override
@GlobalTransactional(timeoutMills = 300000, name = "kc-book-store-tx")
public Success purchase(String body) {
JSONObject obj = JSON.parseObject(body);
String userName = obj.getString("userName");
String productCode = obj.getString("productCode");
int count = obj.getInteger("count");
BigDecimal productPrice = stockMngFacade.queryProductPrice(productCode, userName);
if (productPrice == null) {
throw new RuntimeException("product code does not exist");
}
if (count <= 0) {
throw new RuntimeException("purchase count should not be negative");
}
LOGGER.info("purchase begin ... XID:" + RootContext.getXID());
stockMngFacade.createOrder(userName, productCode, count);
stockMngFacade.minusStockCount(userName, productCode, count);
/* == 这里改成调minusBalancePrepare方法 == */
balanceMngFacade.minusBalancePrepare(null, userName, productPrice.multiply(new BigDecimal(count)));
/* ==== */
LOGGER.info("purchase end");
Success success = new Success();
success.setSuccess("true");
return success;
}
BalanceMngFacade是一个rpc接口,之前的例子我们是用@SofaReference注解方式引入,目前TCC模式不支持注解的方式拦截(一下个版本修复),所以需要改成用xml的方法引入:
- 在stock-mng工程的src/main/resources目录下创建spring目录,并创建seata-sofarpc-reference.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:sofa="http://sofastack.io/schema/sofaboot"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://sofastack.io/schema/sofaboot http://sofastack.io/schema/sofaboot.xsd"
default-autowire="byName">
<sofa:reference id="balanceMngFacade" interface="io.sofastack.balance.manage.facade.BalanceMngFacade" unique-id="${service.unique.id}">
<sofa:binding.bolt />
</sofa:reference>
</beans>
- 在StockMngApplication类上加入@ImportResource注解加载上面的spring配置文件
...
@SpringBootApplication
@ImportResource("classpath*:spring/*.xml")
public class StockMngApplication {
...
- 将BookStoreControllerImpl类中引用balanceMngFacade接口的注解换成@Autowared:
...
//@SofaReference(interfaceType = BalanceMngFacade.class, uniqueId = "${service.unique.id}", binding = @SofaReferenceBinding(bindingType = "bolt"))
@Autowired
private BalanceMngFacade balanceMngFacade;
...
- 运行BalanceMngApplication类的main方法(包含启动Seata server)
- 运行StockMngApplication类的main方法
- 浏览器打开 http://localhost:8080/index.html