如何用C#实现SAGA分布式事务

当我们进行分布式系统开发的时候,需要保证不同分布式系统节点之间的数据的一致性,同时对于不同节点的事务处理也需要保证原子性、一致性、持久性和隔离性。SAGA是IBM公司出品的分布式事务解决方案,主要通过补偿机制来保证事务的一致性,因此最近比较火热。下面我们就来详细讲解如何使用C#实现SAGA分布式事务。

一、什么是SAGA分布式事务?

SAGA是分布式事务的一种解决方案,通过基于局部事务的补偿机制来保证分布式事务的一致性。SAGA模式中,将复杂的分布式事务分为一系列局部事务,这些局部事务通过相互协调来实现分布式事务的最终一致性。

二、SAGA分布式事务的实现流程

SAGA分布式事务的实现流程主要包括以下几个步骤:

  1. 业务流程发起方发起分布式事务;
  2. 业务流程发起方调用Saga Orchestrator,下发事务指令;
  3. 相应的业务服务通过实现CompensatableAction接口,实现事务执行中的“原操作”和“补偿操作”;
  4. 一旦事务中的一个步骤发生异常,Saga Orchestrator会自动回滚并执行相应的补偿操作;
  5. 如果整个事务成功执行地话,Saga Orchestrator会执行最后的完成操作。

三、SAGA分布式事务实现过程示例

接下来,我们将基于C#,使用.NET Core框架,来演示如何使用SAGA分布式事务。

在这个演示中,我们将使用EF Core示例,并且将基于NATS同时还演示了一个异步的方式实现SAGA分布式事务。

示例:

using Microsoft.EntityFrameworkCore;
using NATS.Client;
using System;
using System.Threading.Tasks;

// 定义IAccountService接口,具有创建账户和支付操作
public interface IAccountService
{
    public Task CreateAccountAsync(Account account);
    public Task DeductAsync(DeductRequest request);
}

// 定义一个NATS发送者
public interface IPublisher
{
    public Task PublishAsync(string subject, string msg);
}

// AccountService是一个具体的实现IAccountService接口的类,包括CreateAccountAsync和DeductAsync方法的具体实现
public class AccountService : IAccountService
{
    private readonly AccountDbContext _context;
    private readonly IPublisher _publisher;
    public AccountService(AccountDbContext context, IPublisher publisher)
    {
        _context = context;
        _publisher = publisher;
    }

    public async Task CreateAccountAsync(Account account)
    {
        // 创建账户
        await _context.Accounts.AddAsync(account);
        await _context.SaveChangesAsync();
    }

    public async Task DeductAsync(DeductRequest request)
    {
        // 扣款流程
        var account = await _context.Accounts.FirstOrDefaultAsync(a => a.Id == request.AccountId);
        if (account == null)
        {
            throw new InvalidOperationException("Account not found");
        }

        if (account.Balance < request.Amount)
        {
            throw new InvalidOperationException("Insufficient balance");
        }

        account.Balance = account.Balance - request.Amount;
        _context.Accounts.Update(account);
        await _context.SaveChangesAsync();

        // 向下一步流程的处理器发送消息
        await _publisher.PublishAsync("order.create", $"{{\"OrderId\": {request.OrderId}}}");
    }

}

// AccountDbContext 是使用EF Core定义的数据库上下文
public class AccountDbContext : DbContext
{
    public DbSet<Account> Accounts { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder.UseSqlServer(@"Server=localhost;Database=Account;User ID=sa;Password=123456;");
    }
}


// NATSPublisher 是一个实现了IPublisher接口的NATS消息发送者类
public class NATSPublisher : IPublisher
{
    private readonly IConnection _nats;
    public NATSPublisher()
    {
        _nats = new ConnectionFactory().CreateConnection();
    }

    public async Task PublishAsync(string subject, string msg)
    {
        await Task.Run(() => _nats.Publish(subject, System.Text.Encoding.UTF8.GetBytes(msg)));
    }
}


// 定义具体的补偿操作类,并且继承CompensatableAction接口
class DeductCompensationAction : CompensatableAction
{
    private readonly IAccountService _accountService;
    private readonly DeductRequest _deductRequest;

    public DeductCompensationAction(IAccountService accountService, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _deductRequest = deductRequest;
    }

    public async Task Compensate()
    {
        // 撤销扣款操作
        await _accountService.DeductAsync(new DeductRequest()
        {
            AccountId = _deductRequest.AccountId,
            Amount = -_deductRequest.Amount,
            OrderId = _deductRequest.OrderId
        });
    }

    public async Task Perform()
    {
        // 执行扣款操作
        await _accountService.DeductAsync(_deductRequest);
    }
}

// 定义DeductRequest类,用于记录并提交扣款请求
class DeductRequest
{
    public int AccountId { get; set; }
    public decimal Amount { get; set; }
    public int OrderId { get; set; }
}


// Account类对应于账户
public class Account
{
    public int Id { get; set; }
    public string Name { get; set; }
    public decimal Balance { get; set; }
}

// 下面是使用SAGA模式的业务流程类
public class CreateAccountAndDeductProcess
{
    private readonly IAccountService _accountService;
    private readonly IPublisher _publisher;
    private readonly DeductRequest _deductRequest;

    public CreateAccountAndDeductProcess(IAccountService accountService, IPublisher publisher, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _publisher = publisher;
        _deductRequest = deductRequest;
    }

    // 使用SagaStep特性来进行流程定义
    [SagaStep]
    public async Task CreateAccount()
    {
        await _accountService.CreateAccountAsync(new Account()
        {
            Name = $"Account_{Guid.NewGuid()}",
            Balance = _deductRequest.Amount
        });

        // 发布第二步流程的消息
        await _publisher.PublishAsync("deduct.account", $"{{\"AccountId\": {1},\"OrderId\" : {_deductRequest.OrderId}, \"Amount\" : {_deductRequest.Amount}}}");
    }

    [SagaStep(IsCompensable = true)]
    public async Task DeductAccount()
    {
        // 发布扣款消息
        await _accountService.DeductAsync(_deductRequest);
    }

    // 定义回滚操作
    [CompensationAction]
    public async Task RollbackDeductAccount()
    {
        var compensationAction = new DeductCompensationAction(_accountService, _deductRequest);
        await compensationAction.Compensate();
    }
}

// SagaOrchestrator类提供了流程的发起和处理能力
public class SagaOrchestrator
{
    private readonly ISubscription _subscription;
    private readonly IAccountService _accountService;

    public SagaOrchestrator(ISubscription subscription, IAccountService accountService)
    {
        _subscription = subscription;
        _accountService = accountService;
    }

    public async Task Run()
    {
        Console.WriteLine("Saga Orchestrator started");

        while (true)
        {
            Message msg = _subscription.NextMessage();
            var subject = msg.Subject;
            var data = System.Text.Encoding.UTF8.GetString(msg.Data);

            Console.WriteLine($"{subject}: {data}");

            switch (subject)
            {
                // 开始订单流程
                case "order.create":
                    await CreateAccountAndDeductProcessRunner(data);
                    break;
                // 建立账户并扣款
                case "deduct.account":
                    await DeductAccountProcessRunner(data);
                    break;
                default:
                    break;
            }

            await Task.Delay(50);
        }
    }

    private async Task CreateAccountAndDeductProcessRunner(string data)
    {
        Console.WriteLine("Create Account And Deduct Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }

    private async Task DeductAccountProcessRunner(string data)
    {
        Console.WriteLine("Deduct Account Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }
}

// 最后,我们来演示如何运行上述代码

static async Task Main(string[] args)
{
    var factory = new ConnectionFactory();
    // 建立一个连接
    IConnection nats = await Task.Run(() => factory.CreateConnection());
    // 订阅主题
    ISubscription subscription = await Task.Run(() =>
        nats.SubscribeAsync(">", (sender, args) =>
        {
            Console.WriteLine("*************************");
            Console.WriteLine(args.Message.Data);
        }));

    // 创建一个DbContext,并且初始化数据库
    using (var db = new AccountDbContext())
    {
        await db.Database.EnsureDeletedAsync();
        await db.Database.EnsureCreatedAsync();
    }

    // 初始化NATS消息发送
    var publisher = new NATSPublisher();

    // 初始化业务service
    var accountService = new AccountService(new AccountDbContext(), publisher);

    // 运行saga orchestrator
    var so = new SagaOrchestrator(subscription, accountService);
    await so.Run();
}

在上面的代码中,我们演示了一个基于.NET Core的EF Core的SAGA分布式事务的示例。在具体实现中,分别定义了账户服务、账户上下文,NATS发送者,扣款请求、账户及NATSPublisher实现,定义具体的补偿回滚操作类,定义创建账户和扣款的具体流程。最后,我们在Main方法中初始化DbContext和NATS消息发送器,运行saga orchestrator,即可进行分布式事务的运行。

建议在编写此类代码时,需要注意以下几点:

  1. 需要特别注意所有局部事务之间的依赖关系,避免出现数据依赖错误;
  2. 注意消息在事件传递过程中的状态,避免出现重复或者错误的消息;
  3. 要对分布式事务中各种错误进行充分的测试,以确保系统在各种情况下都能够正常运行。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何用C#实现SAGA分布式事务 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • WCF实现双向通信

    下面是关于“WCF实现双向通信”的完整攻略,包含两个示例。 1. 什么是WCF双向通信 WCF双向通信是一种WCF通信模式,它允许客户端和服务端之间进行双向通信。在WCF双向通信中,客户端和服务端都可以发送和接收消息,这使得WCF双向通信非常适合实现实时通信。 2. 示例1:创建WCF服务 以下是一个示例,演示如何创建WCF服务: using System;…

    C# 2023年5月15日
    00
  • 关于C# TabPage如何隐藏的问题

    下面是关于C# TabPage如何隐藏的完整攻略: 关于TabPage TabPage是C#中Windows Form中的一种控件,用于创建选项卡界面。一个选项卡界面可以包含多个选项卡页(TabPage)。 隐藏TabPage 隐藏一个TabPage非常简单,只需要设置它的Visible属性即可。如果设置为false,TabPage将不会在界面上显示。示例如…

    C# 2023年6月6日
    00
  • Unity制作游戏自定义按键详解

    Unity制作游戏自定义按键详解 在 Unity 中,通过自定义按键来更好地控制游戏角色或执行一些特殊动作是很常见的需求。在这篇文章中,我们将详细讲解如何使用 Unity 的 Input Manager 来自定义按键。 1. Input Manager 输入管理器(Input Manager)是 Unity 中一个非常常用的工具,它可以让我们很方便地管理游戏…

    C# 2023年5月15日
    00
  • C#多线程之Thread中Thread.Join()函数用法分析

    当我们使用C#中的多线程编程时,有时候需要在主线程中等待子线程完成后再继续执行。Thread.Join()函数就是用来实现这一功能的。本文将全面介绍Thread.Join()函数的用法及示例说明。 什么是Thread.Join()函数 Thread.Join()函数是Thread类中的一个方法,它的作用是阻塞主线程,直到当前线程执行完成。当调用Thread.…

    C# 2023年6月7日
    00
  • C#字节数组(byte[])和字符串相互转换方式

    关于C#字节数组(byte[])和字符串相互转换方式的攻略,下面是详细讲解: 1. 字符串转字节数组 在C#中,可以使用Encoding类中的GetBytes方法将一个字符串转换为字节数组,示例如下: string str = "hello world"; byte[] strBytes = Encoding.UTF8.GetBytes(…

    C# 2023年6月7日
    00
  • 深入c# Func委托的详解

    深入c# Func委托的详解 什么是Func委托 Func委托是一个通用泛型委托,可以接受1至16个输入参数,并返回一个返回值。因为Func是一个泛型委托,所以可以用来创建适合各种输入和返回类型的委托。 Func是一个系统内建的委托类型,在System命名空间中定义,其语法如下: public delegate TResult Func<in T, o…

    C# 2023年6月1日
    00
  • C++中#include头文件的示例详解

    下面是关于”C++中#include头文件的示例详解”的攻略: 什么是头文件? 在C++中,头文件是一种用于包含函数和变量定义的文件。这些文件通常包含函数和变量定义,使得在源代码中我们可以使用这些定义。通常情况下,头文件扩展名为.h。实际上,C++编译器不知道如何处理头文件,它只知道如何处理源代码文件,而头文件的作用就是将需要在源代码中用到的函数和变量定义放…

    C# 2023年6月6日
    00
  • C#编程中最容易犯的7种编写错误分享

    下面我将为你详细讲解“C#编程中最容易犯的7种编写错误分享”的完整攻略: 1. 变量使用错误 在C#编程中最常见的错误之一就是变量使用错误。可能会出现以下情况:- 变量未初始化,导致出现未知的值- 变量名与其他变量名冲突,造成混淆- 变量没有按照规定使用,造成计算错误 为避免这些问题,我们需要遵循以下准则:- 变量使用前必须初始化- 使用有意义的变量名- 识…

    C# 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部