Spring Boot 多数据源处理事务的思路详解
为什么需要多数据源
在实际应用中,我们可能需要连接多个数据库来完成不同的业务需求,例如:用户数据存在 MySQL 数据库中,订单数据存在 MongoDB 数据库中,而且不同的数据源可能有不同的事务管理机制,为了更好地处理多数据源事务,我们需要进行多数据源处理。
Spring Boot 多数据源处理事务方案
数据源配置
- 针对不同的数据源,需要进行配置,在
application.yml
(或者.properties
)文件中按照以下格式进行配置:
yml
spring:
datasource:
primary:
url: jdbc:mysql://localhost:3306/primary_db?useSSL=false&useUnicode=true&characterEncoding=utf8
driver-class-name: com.mysql.jdbc.Driver
username: root
password:
secondary:
url: jdbc:mysql://localhost:3306/secondary_db?useSSL=false&useUnicode=true&characterEncoding=utf8
driver-class-name: com.mysql.jdbc.Driver
username: root
password:
其中,primary
和 secondary
分别是不同数据源的名称,可以根据实际需要进行修改。这里我们使用了 MySQL 数据库作为示例。
- 在
application.yml
中配置事务管理器:
yml
spring:
jpa:
properties:
hibernate:
current_session_context_class: org.springframework.orm.hibernate5.SpringSessionContext
#事务管理类型嵌套
transaction:
factory_class: org.hibernate.resource.transaction.backend.jdbc.internal.DdlTransactionIsolatorJtaPlatformAdapter
jta:
allow_multiple_instantiations: true
#这里为Hibernate事务管理器的实现类,pool_size参数表示回收超时时间
tm:
quarkus:
factory_class: org.hibernate.engine.transaction.jta.platform.internal.QuarkusJtaPlatform
pool_size: 5
这里我们使用了 Spring Boot 自带的 JPA。
处理多数据源事务
- 定义多数据源事务注解
```java
/*
* 多数据源事务注解
/
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Transactional(transactionManager = "chainedTransactionManager", propagation = Propagation.REQUIRED)
public @interface MultiTransactional {
}
```
- 在需要启用多数据源事务的方法或类上添加
@MultiTransactional
注解。
java
@MultiTransactional
public void process() {
// todo: 处理过程
}
在 @Transactional
中指定了 transactionManager
参数为 chainedTransactionManager
,表示采用级联事务管理器。
配置级联事务管理器
- 继承
ChainedTransactionManager
类,重写doGetTransaction()
方法,实现对多数据源事务的处理。
```java
package com.example.demo.transaction;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import javax.transaction.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/*
* 多数据源事务管理器
/
public class MultiDataSourceTransactionManager extends AbstractPlatformTransactionManager {
private static final long serialVersionUID = 8589808433578176396L;
private BeanFactoryTransactionManagerFactoryBean factoryBeanFactory;// 事务管理工厂
private final Map<Object, Transaction> transactionLookup = new ConcurrentHashMap<Object, Transaction>();//事务对象
private final Map<Object, AtomicInteger> referenceCount = new HashMap<Object, AtomicInteger>();
private final Map<Object, Integer> timeoutMap = new HashMap<Object, Integer>();
public MultiDataSourceTransactionManager() {
super();
}
public void setFactoryBeanFactory(BeanFactoryTransactionManagerFactoryBean factoryBeanFactory){
this.factoryBeanFactory = factoryBeanFactory;
}
@Override
protected Object doGetTransaction() {
Object services = getTransactionFactory();
TransactionStatus transactionStatus = (TransactionStatus) transactionLookup.get(services);
if(transactionStatus == null){
transactionStatus = new DataSourceTransactionStatus(this);
transactionLookup.put(services, (Transaction) transactionStatus);
}
return transactionStatus;
}
protected TransactionFactory getTransactionFactory(){
Object services = factoryBeanFactory.getDataSources();
if(services == null){
throw new IllegalStateException("Must set dataSources before accessing");
}
return (TransactionFactory)services;
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException {
DataSourceTransactionStatus status = (DataSourceTransactionStatus) transaction;
if(referenceCount.containsKey(status.source())){
AtomicInteger counter = referenceCount.get(status.source());
int count = counter.incrementAndGet();
status.setSavepoint(count);
}else{
try {
MultiDataSourceTransactionImpl multiDataSourceTransaction = new MultiDataSourceTransactionImpl(MultiDataSourceUtils.getEntityManagerFactory(status.source()), definition.getIsolationLevel() , mustRestore(definition.getName()));
multiDataSourceTransaction.setRollbackOnlyOnly(!definition.isReadOnly());
multiDataSourceTransaction.begin();
AtomicInteger counter = new AtomicInteger(1);
referenceCount.put(status.source(), counter);
transactionLookup.put(status.source(), (Transaction) multiDataSourceTransaction);
status.setTransaction(multiDataSourceTransaction);
status.setSavepoint(counter.get());
} catch (NotSupportedException e) {
e.printStackTrace();
throw new TransactionException("不支持事务");
} catch (SystemException e) {
e.printStackTrace();
throw new TransactionException("系统异常");
}
}
}
@Override
protected void doCommit(TransactionStatus status) throws TransactionException {
DataSourceTransactionStatus dataSourceTransactionStatus = (DataSourceTransactionStatus)status;
MultiDataSourceTransaction transaction = (MultiDataSourceTransaction)dataSourceTransactionStatus.getTransaction();
if(dataSourceTransactionStatus.isCleared()){
transaction.setStatus(TransactionStatus.STATUS_COMMITTED);
}else{
if(dataSourceTransactionStatus.isRollbackOnly()){
transaction.setStatus(TransactionStatus.STATUS_ROLLEDBACK);
}else if(!dataSourceTransactionStatus.isCompleted()){
int savepoint = dataSourceTransactionStatus.getSavepoint();
transaction.savepoint(savepoint);
}
transaction.commit();
}
MultiDataSourceUtils.closeEntityManagers(dataSourceTransactionStatus.source());
referenceCount.remove(dataSourceTransactionStatus.source());
dataSourceTransactionStatus.setCompleted();
transactionLookup.remove(dataSourceTransactionStatus.source());
}
@Override
protected void doRollback(TransactionStatus status) throws TransactionException {
DataSourceTransactionStatus dataSourceTransactionStatus = (DataSourceTransactionStatus)status;
MultiDataSourceTransaction transaction = (MultiDataSourceTransaction)dataSourceTransactionStatus.getTransaction();
if(dataSourceTransactionStatus.isCleared()){
transaction.setStatus(TransactionStatus.STATUS_ROLLEDBACK);
}else{
int savepoint = dataSourceTransactionStatus.getSavepoint();
transaction.rollback(null, savepoint);
transaction.setStatus(TransactionStatus.STATUS_ROLLEDBACK);
}
MultiDataSourceUtils.closeEntityManagers(dataSourceTransactionStatus.source());
referenceCount.remove(dataSourceTransactionStatus.source());
dataSourceTransactionStatus.setCompleted();
transactionLookup.remove(dataSourceTransactionStatus.source());
}
@Override
protected void doSetRollbackOnly(TransactionStatus status)
throws TransactionException {
DataSourceTransactionStatus dataSourceTransactionStatus = (DataSourceTransactionStatus)status;
MultiDataSourceTransaction transaction = (MultiDataSourceTransaction)dataSourceTransactionStatus.getTransaction();
// 如果上层的事务已经标记了rollbackOnly,则直接继承
if (getStatus() == TransactionStatus.STATUS_MARKED_ROLLBACK) {
transaction.setRollbackOnly();
} else {
transaction.setRollbackOnlyOnly(true);
}
dataSourceTransactionStatus.setRollbackOnly();
}
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionStatus dataSourceTransactionStatus = (DataSourceTransactionStatus)transaction;
Transaction tx = transactionLookup.get(dataSourceTransactionStatus.source());
if(tx != null && dataSourceTransactionStatus.isCompleted())
{
MultiDataSourceUtils.closeEntityManagers(dataSourceTransactionStatus.source());
}
transactionLookup.remove(dataSourceTransactionStatus.source());
dataSourceTransactionStatus.setRollbackOnlyOnly(false);
dataSourceTransactionStatus.setCompleted();
}
protected int mustRestore(String name) {
MultiTransactional annotation = AnnotationUtils.findAnnotation(getClass(), MultiTransactional.class);
return annotation.propagation();
}
protected int doGetTimeout(TransactionStatus status) {
DataSourceTransactionStatus dataSourceTransactionStatus = (DataSourceTransactionStatus)status;
Object obj = timeoutMap.get(dataSourceTransactionStatus.source());
if(env.resolvePlaceholders(SystemParameters.TRANSACTION_TIMEOUT).equals(obj.toString())){
return TransactionDefinition.TIMEOUT_DEFAULT;
}else{
return Integer.parseInt(obj.toString());
}
}
}
``
doGetTransaction()
在方法中获取事务对象,并将其放入
transactionLookup` 中。
在 doBegin()
方法中,如果当前数据源已经存在事务,那么将该数据源的 referenceCount 增加,并设置保存点;否则,创建新的 MultiDataSourceTransactionImpl,设置数据源、事务隔离级别、是否支持恢复操作,并开始事务。
在 doCommit()
和 doRollback()
方法中分别对数据源的事务进行提交和回滚。
在 doSetRollbackOnly()
方法中对当前数据源的事务进行标记回滚操作。
在 doCleanupAfterCompletion()
方法中关闭数据源连接,并从 transactionLookup
中删除事务对象。
- 定义级联事务管理器工厂
```java
package com.example.demo.transaction;
import org.springframework.transaction.PlatformTransactionManager;
/
* 多数据源事务管理器工厂
*/
public class ChainedTransactionManagerFactoryBean extends AbstractTransactionManagerFactoryBean {
/
* 数据源
*/
private Object dataSources;
public ChainedTransactionManagerFactoryBean() {
super();
}
public void setDataSources(Object dataSources) {
this.dataSources = dataSources;
}
@Override
protected PlatformTransactionManager createTransactionManager() {
MultiDataSourceTransactionManager chainedTransactionManager = new MultiDataSourceTransactionManager();
chainedTransactionManager.setFactoryBeanFactory(this);
return chainedTransactionManager;
}
public Object getDataSources() {
return dataSources;
}
}
``
createTransactionManager()
在方法中创建一个 MultiDataSourceTransactionManager 对象,并调用
setFactoryBeanFactory()` 方法,设置为本类的实例。
示例
下面是一个使用多数据源事务的示例:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private UserRepository userRepository;
@MultiTransactional
public void saveOrderAndUser(Order order, User user) {
userRepository.save(user);
orderRepository.save(order);
}
}
在 saveOrderAndUser()
方法上加上了 @MultiTransactional
注解,在该方法中处理用户和订单的保存操作。
总结
通过以上步骤,我们就可以实现 Spring Boot 的多数据源事务处理,对于复杂的业务逻辑,此方案可以提供良好的支持。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring Boot 多数据源处理事务的思路详解 - Python技术站