当我们进行分布式系统开发的时候,需要保证不同分布式系统节点之间的数据的一致性,同时对于不同节点的事务处理也需要保证原子性、一致性、持久性和隔离性。SAGA是IBM公司出品的分布式事务解决方案,主要通过补偿机制来保证事务的一致性,因此最近比较火热。下面我们就来详细讲解如何使用C#实现SAGA分布式事务。
一、什么是SAGA分布式事务?
SAGA是分布式事务的一种解决方案,通过基于局部事务的补偿机制来保证分布式事务的一致性。SAGA模式中,将复杂的分布式事务分为一系列局部事务,这些局部事务通过相互协调来实现分布式事务的最终一致性。
二、SAGA分布式事务的实现流程
SAGA分布式事务的实现流程主要包括以下几个步骤:
- 业务流程发起方发起分布式事务;
- 业务流程发起方调用Saga Orchestrator,下发事务指令;
- 相应的业务服务通过实现CompensatableAction接口,实现事务执行中的“原操作”和“补偿操作”;
- 一旦事务中的一个步骤发生异常,Saga Orchestrator会自动回滚并执行相应的补偿操作;
- 如果整个事务成功执行地话,Saga Orchestrator会执行最后的完成操作。
三、SAGA分布式事务实现过程示例
接下来,我们将基于C#,使用.NET Core框架,来演示如何使用SAGA分布式事务。
在这个演示中,我们将使用EF Core示例,并且将基于NATS同时还演示了一个异步的方式实现SAGA分布式事务。
示例:
using Microsoft.EntityFrameworkCore;
using NATS.Client;
using System;
using System.Threading.Tasks;
// 定义IAccountService接口,具有创建账户和支付操作
public interface IAccountService
{
public Task CreateAccountAsync(Account account);
public Task DeductAsync(DeductRequest request);
}
// 定义一个NATS发送者
public interface IPublisher
{
public Task PublishAsync(string subject, string msg);
}
// AccountService是一个具体的实现IAccountService接口的类,包括CreateAccountAsync和DeductAsync方法的具体实现
public class AccountService : IAccountService
{
private readonly AccountDbContext _context;
private readonly IPublisher _publisher;
public AccountService(AccountDbContext context, IPublisher publisher)
{
_context = context;
_publisher = publisher;
}
public async Task CreateAccountAsync(Account account)
{
// 创建账户
await _context.Accounts.AddAsync(account);
await _context.SaveChangesAsync();
}
public async Task DeductAsync(DeductRequest request)
{
// 扣款流程
var account = await _context.Accounts.FirstOrDefaultAsync(a => a.Id == request.AccountId);
if (account == null)
{
throw new InvalidOperationException("Account not found");
}
if (account.Balance < request.Amount)
{
throw new InvalidOperationException("Insufficient balance");
}
account.Balance = account.Balance - request.Amount;
_context.Accounts.Update(account);
await _context.SaveChangesAsync();
// 向下一步流程的处理器发送消息
await _publisher.PublishAsync("order.create", $"{{\"OrderId\": {request.OrderId}}}");
}
}
// AccountDbContext 是使用EF Core定义的数据库上下文
public class AccountDbContext : DbContext
{
public DbSet<Account> Accounts { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(@"Server=localhost;Database=Account;User ID=sa;Password=123456;");
}
}
// NATSPublisher 是一个实现了IPublisher接口的NATS消息发送者类
public class NATSPublisher : IPublisher
{
private readonly IConnection _nats;
public NATSPublisher()
{
_nats = new ConnectionFactory().CreateConnection();
}
public async Task PublishAsync(string subject, string msg)
{
await Task.Run(() => _nats.Publish(subject, System.Text.Encoding.UTF8.GetBytes(msg)));
}
}
// 定义具体的补偿操作类,并且继承CompensatableAction接口
class DeductCompensationAction : CompensatableAction
{
private readonly IAccountService _accountService;
private readonly DeductRequest _deductRequest;
public DeductCompensationAction(IAccountService accountService, DeductRequest deductRequest)
{
_accountService = accountService;
_deductRequest = deductRequest;
}
public async Task Compensate()
{
// 撤销扣款操作
await _accountService.DeductAsync(new DeductRequest()
{
AccountId = _deductRequest.AccountId,
Amount = -_deductRequest.Amount,
OrderId = _deductRequest.OrderId
});
}
public async Task Perform()
{
// 执行扣款操作
await _accountService.DeductAsync(_deductRequest);
}
}
// 定义DeductRequest类,用于记录并提交扣款请求
class DeductRequest
{
public int AccountId { get; set; }
public decimal Amount { get; set; }
public int OrderId { get; set; }
}
// Account类对应于账户
public class Account
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Balance { get; set; }
}
// 下面是使用SAGA模式的业务流程类
public class CreateAccountAndDeductProcess
{
private readonly IAccountService _accountService;
private readonly IPublisher _publisher;
private readonly DeductRequest _deductRequest;
public CreateAccountAndDeductProcess(IAccountService accountService, IPublisher publisher, DeductRequest deductRequest)
{
_accountService = accountService;
_publisher = publisher;
_deductRequest = deductRequest;
}
// 使用SagaStep特性来进行流程定义
[SagaStep]
public async Task CreateAccount()
{
await _accountService.CreateAccountAsync(new Account()
{
Name = $"Account_{Guid.NewGuid()}",
Balance = _deductRequest.Amount
});
// 发布第二步流程的消息
await _publisher.PublishAsync("deduct.account", $"{{\"AccountId\": {1},\"OrderId\" : {_deductRequest.OrderId}, \"Amount\" : {_deductRequest.Amount}}}");
}
[SagaStep(IsCompensable = true)]
public async Task DeductAccount()
{
// 发布扣款消息
await _accountService.DeductAsync(_deductRequest);
}
// 定义回滚操作
[CompensationAction]
public async Task RollbackDeductAccount()
{
var compensationAction = new DeductCompensationAction(_accountService, _deductRequest);
await compensationAction.Compensate();
}
}
// SagaOrchestrator类提供了流程的发起和处理能力
public class SagaOrchestrator
{
private readonly ISubscription _subscription;
private readonly IAccountService _accountService;
public SagaOrchestrator(ISubscription subscription, IAccountService accountService)
{
_subscription = subscription;
_accountService = accountService;
}
public async Task Run()
{
Console.WriteLine("Saga Orchestrator started");
while (true)
{
Message msg = _subscription.NextMessage();
var subject = msg.Subject;
var data = System.Text.Encoding.UTF8.GetString(msg.Data);
Console.WriteLine($"{subject}: {data}");
switch (subject)
{
// 开始订单流程
case "order.create":
await CreateAccountAndDeductProcessRunner(data);
break;
// 建立账户并扣款
case "deduct.account":
await DeductAccountProcessRunner(data);
break;
default:
break;
}
await Task.Delay(50);
}
}
private async Task CreateAccountAndDeductProcessRunner(string data)
{
Console.WriteLine("Create Account And Deduct Process started");
var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);
var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);
await sagaBuilder
.Invoke(x => x.CreateAccount)
.Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
.Build()
.RunAsync();
}
private async Task DeductAccountProcessRunner(string data)
{
Console.WriteLine("Deduct Account Process started");
var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);
var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);
await sagaBuilder
.Invoke(x => x.CreateAccount)
.Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
.Build()
.RunAsync();
}
}
// 最后,我们来演示如何运行上述代码
static async Task Main(string[] args)
{
var factory = new ConnectionFactory();
// 建立一个连接
IConnection nats = await Task.Run(() => factory.CreateConnection());
// 订阅主题
ISubscription subscription = await Task.Run(() =>
nats.SubscribeAsync(">", (sender, args) =>
{
Console.WriteLine("*************************");
Console.WriteLine(args.Message.Data);
}));
// 创建一个DbContext,并且初始化数据库
using (var db = new AccountDbContext())
{
await db.Database.EnsureDeletedAsync();
await db.Database.EnsureCreatedAsync();
}
// 初始化NATS消息发送
var publisher = new NATSPublisher();
// 初始化业务service
var accountService = new AccountService(new AccountDbContext(), publisher);
// 运行saga orchestrator
var so = new SagaOrchestrator(subscription, accountService);
await so.Run();
}
在上面的代码中,我们演示了一个基于.NET Core的EF Core的SAGA分布式事务的示例。在具体实现中,分别定义了账户服务、账户上下文,NATS发送者,扣款请求、账户及NATSPublisher实现,定义具体的补偿回滚操作类,定义创建账户和扣款的具体流程。最后,我们在Main方法中初始化DbContext和NATS消息发送器,运行saga orchestrator,即可进行分布式事务的运行。
建议在编写此类代码时,需要注意以下几点:
- 需要特别注意所有局部事务之间的依赖关系,避免出现数据依赖错误;
- 注意消息在事件传递过程中的状态,避免出现重复或者错误的消息;
- 要对分布式事务中各种错误进行充分的测试,以确保系统在各种情况下都能够正常运行。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何用C#实现SAGA分布式事务 - Python技术站