如何用C#实现SAGA分布式事务

当我们进行分布式系统开发的时候,需要保证不同分布式系统节点之间的数据的一致性,同时对于不同节点的事务处理也需要保证原子性、一致性、持久性和隔离性。SAGA是IBM公司出品的分布式事务解决方案,主要通过补偿机制来保证事务的一致性,因此最近比较火热。下面我们就来详细讲解如何使用C#实现SAGA分布式事务。

一、什么是SAGA分布式事务?

SAGA是分布式事务的一种解决方案,通过基于局部事务的补偿机制来保证分布式事务的一致性。SAGA模式中,将复杂的分布式事务分为一系列局部事务,这些局部事务通过相互协调来实现分布式事务的最终一致性。

二、SAGA分布式事务的实现流程

SAGA分布式事务的实现流程主要包括以下几个步骤:

  1. 业务流程发起方发起分布式事务;
  2. 业务流程发起方调用Saga Orchestrator,下发事务指令;
  3. 相应的业务服务通过实现CompensatableAction接口,实现事务执行中的“原操作”和“补偿操作”;
  4. 一旦事务中的一个步骤发生异常,Saga Orchestrator会自动回滚并执行相应的补偿操作;
  5. 如果整个事务成功执行地话,Saga Orchestrator会执行最后的完成操作。

三、SAGA分布式事务实现过程示例

接下来,我们将基于C#,使用.NET Core框架,来演示如何使用SAGA分布式事务。

在这个演示中,我们将使用EF Core示例,并且将基于NATS同时还演示了一个异步的方式实现SAGA分布式事务。

示例:

using Microsoft.EntityFrameworkCore;
using NATS.Client;
using System;
using System.Threading.Tasks;

// 定义IAccountService接口,具有创建账户和支付操作
public interface IAccountService
{
    public Task CreateAccountAsync(Account account);
    public Task DeductAsync(DeductRequest request);
}

// 定义一个NATS发送者
public interface IPublisher
{
    public Task PublishAsync(string subject, string msg);
}

// AccountService是一个具体的实现IAccountService接口的类,包括CreateAccountAsync和DeductAsync方法的具体实现
public class AccountService : IAccountService
{
    private readonly AccountDbContext _context;
    private readonly IPublisher _publisher;
    public AccountService(AccountDbContext context, IPublisher publisher)
    {
        _context = context;
        _publisher = publisher;
    }

    public async Task CreateAccountAsync(Account account)
    {
        // 创建账户
        await _context.Accounts.AddAsync(account);
        await _context.SaveChangesAsync();
    }

    public async Task DeductAsync(DeductRequest request)
    {
        // 扣款流程
        var account = await _context.Accounts.FirstOrDefaultAsync(a => a.Id == request.AccountId);
        if (account == null)
        {
            throw new InvalidOperationException("Account not found");
        }

        if (account.Balance < request.Amount)
        {
            throw new InvalidOperationException("Insufficient balance");
        }

        account.Balance = account.Balance - request.Amount;
        _context.Accounts.Update(account);
        await _context.SaveChangesAsync();

        // 向下一步流程的处理器发送消息
        await _publisher.PublishAsync("order.create", $"{{\"OrderId\": {request.OrderId}}}");
    }

}

// AccountDbContext 是使用EF Core定义的数据库上下文
public class AccountDbContext : DbContext
{
    public DbSet<Account> Accounts { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder.UseSqlServer(@"Server=localhost;Database=Account;User ID=sa;Password=123456;");
    }
}


// NATSPublisher 是一个实现了IPublisher接口的NATS消息发送者类
public class NATSPublisher : IPublisher
{
    private readonly IConnection _nats;
    public NATSPublisher()
    {
        _nats = new ConnectionFactory().CreateConnection();
    }

    public async Task PublishAsync(string subject, string msg)
    {
        await Task.Run(() => _nats.Publish(subject, System.Text.Encoding.UTF8.GetBytes(msg)));
    }
}


// 定义具体的补偿操作类,并且继承CompensatableAction接口
class DeductCompensationAction : CompensatableAction
{
    private readonly IAccountService _accountService;
    private readonly DeductRequest _deductRequest;

    public DeductCompensationAction(IAccountService accountService, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _deductRequest = deductRequest;
    }

    public async Task Compensate()
    {
        // 撤销扣款操作
        await _accountService.DeductAsync(new DeductRequest()
        {
            AccountId = _deductRequest.AccountId,
            Amount = -_deductRequest.Amount,
            OrderId = _deductRequest.OrderId
        });
    }

    public async Task Perform()
    {
        // 执行扣款操作
        await _accountService.DeductAsync(_deductRequest);
    }
}

// 定义DeductRequest类,用于记录并提交扣款请求
class DeductRequest
{
    public int AccountId { get; set; }
    public decimal Amount { get; set; }
    public int OrderId { get; set; }
}


// Account类对应于账户
public class Account
{
    public int Id { get; set; }
    public string Name { get; set; }
    public decimal Balance { get; set; }
}

// 下面是使用SAGA模式的业务流程类
public class CreateAccountAndDeductProcess
{
    private readonly IAccountService _accountService;
    private readonly IPublisher _publisher;
    private readonly DeductRequest _deductRequest;

    public CreateAccountAndDeductProcess(IAccountService accountService, IPublisher publisher, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _publisher = publisher;
        _deductRequest = deductRequest;
    }

    // 使用SagaStep特性来进行流程定义
    [SagaStep]
    public async Task CreateAccount()
    {
        await _accountService.CreateAccountAsync(new Account()
        {
            Name = $"Account_{Guid.NewGuid()}",
            Balance = _deductRequest.Amount
        });

        // 发布第二步流程的消息
        await _publisher.PublishAsync("deduct.account", $"{{\"AccountId\": {1},\"OrderId\" : {_deductRequest.OrderId}, \"Amount\" : {_deductRequest.Amount}}}");
    }

    [SagaStep(IsCompensable = true)]
    public async Task DeductAccount()
    {
        // 发布扣款消息
        await _accountService.DeductAsync(_deductRequest);
    }

    // 定义回滚操作
    [CompensationAction]
    public async Task RollbackDeductAccount()
    {
        var compensationAction = new DeductCompensationAction(_accountService, _deductRequest);
        await compensationAction.Compensate();
    }
}

// SagaOrchestrator类提供了流程的发起和处理能力
public class SagaOrchestrator
{
    private readonly ISubscription _subscription;
    private readonly IAccountService _accountService;

    public SagaOrchestrator(ISubscription subscription, IAccountService accountService)
    {
        _subscription = subscription;
        _accountService = accountService;
    }

    public async Task Run()
    {
        Console.WriteLine("Saga Orchestrator started");

        while (true)
        {
            Message msg = _subscription.NextMessage();
            var subject = msg.Subject;
            var data = System.Text.Encoding.UTF8.GetString(msg.Data);

            Console.WriteLine($"{subject}: {data}");

            switch (subject)
            {
                // 开始订单流程
                case "order.create":
                    await CreateAccountAndDeductProcessRunner(data);
                    break;
                // 建立账户并扣款
                case "deduct.account":
                    await DeductAccountProcessRunner(data);
                    break;
                default:
                    break;
            }

            await Task.Delay(50);
        }
    }

    private async Task CreateAccountAndDeductProcessRunner(string data)
    {
        Console.WriteLine("Create Account And Deduct Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }

    private async Task DeductAccountProcessRunner(string data)
    {
        Console.WriteLine("Deduct Account Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }
}

// 最后,我们来演示如何运行上述代码

static async Task Main(string[] args)
{
    var factory = new ConnectionFactory();
    // 建立一个连接
    IConnection nats = await Task.Run(() => factory.CreateConnection());
    // 订阅主题
    ISubscription subscription = await Task.Run(() =>
        nats.SubscribeAsync(">", (sender, args) =>
        {
            Console.WriteLine("*************************");
            Console.WriteLine(args.Message.Data);
        }));

    // 创建一个DbContext,并且初始化数据库
    using (var db = new AccountDbContext())
    {
        await db.Database.EnsureDeletedAsync();
        await db.Database.EnsureCreatedAsync();
    }

    // 初始化NATS消息发送
    var publisher = new NATSPublisher();

    // 初始化业务service
    var accountService = new AccountService(new AccountDbContext(), publisher);

    // 运行saga orchestrator
    var so = new SagaOrchestrator(subscription, accountService);
    await so.Run();
}

在上面的代码中,我们演示了一个基于.NET Core的EF Core的SAGA分布式事务的示例。在具体实现中,分别定义了账户服务、账户上下文,NATS发送者,扣款请求、账户及NATSPublisher实现,定义具体的补偿回滚操作类,定义创建账户和扣款的具体流程。最后,我们在Main方法中初始化DbContext和NATS消息发送器,运行saga orchestrator,即可进行分布式事务的运行。

建议在编写此类代码时,需要注意以下几点:

  1. 需要特别注意所有局部事务之间的依赖关系,避免出现数据依赖错误;
  2. 注意消息在事件传递过程中的状态,避免出现重复或者错误的消息;
  3. 要对分布式事务中各种错误进行充分的测试,以确保系统在各种情况下都能够正常运行。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何用C#实现SAGA分布式事务 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • 解读在C#中winform程序响应键盘事件的详解

    当一个winform程序运行时,用户可能会进行键盘输入操作。C#提供了键盘事件处理,使得我们能够简单地响应这些事件。在本文中,我们将学习如何在C#中处理键盘事件。 键盘事件 在C#中处理键盘事件,需要使用WindowsForms库提供的KeyPress, KeyUp和KeyDown事件。这些事件都继承自Control.KeyPressEventHandler…

    C# 2023年6月6日
    00
  • 常用类之TCP连接类-socket编程

    下面是关于“常用类之TCP连接类-socket编程”的完整攻略。 1. TCP连接类介绍 在进行socket网络编程时,我们需要使用到TCP连接类,该类被封装成了Python的socket库。它是一种基于客户机/服务器模式的、可靠的、面向连接的、传输层通信协议,它在应用层和TCP/IP协议族的传输层之间进行数据传输。使用TCP连接类,我们可以轻松实现实时通信…

    C# 2023年6月7日
    00
  • asp.net StreamReader 创建文件的实例代码

    首先我们来介绍一下如何使用 StreamReader 创建文件的实例。 StreamReader 是一个用于读取文本文件的类,它可以直接创建一个文件的实例,并对文件进行读取操作。在使用 StreamReader 创建文件的实例时,需要指定一个文件的路径,来表示要读取的文件的位置。在指定文件路径时,我们可以使用相对路径或绝对路径。相对路径是相对于当前程序运行的…

    C# 2023年6月3日
    00
  • C#随机生成不重复字符串的两个不错方法

    C#随机生成不重复字符串的两个不错方法 在C#中,我们通常使用Random类来生成随机字符串。但是,如何保证生成的字符串不重复呢?下面介绍两种不错的方法。 方法一:GUID 我们知道,在C#中,可以使用Guid.NewGuid()方法生成全局唯一的GUID字符串。因此,可以将Guid转换成字符串来作为随机字符串。 示例代码: string GenerateU…

    C# 2023年6月8日
    00
  • C#11新特性使用案例详解

    C#11新特性使用案例详解 C#语言在11版的时候增加了一些新特性,这些新特性可以让我们在编写代码时更加方便,提高代码的可读性和性能。接下来我们来详细讲解一下这些新特性的使用案例。 新特性列表 以下是C#11中新增加的新特性: 针对null的操作符 ?, ?? 和 ?. 元组的方法和参数 局部函数的放宽限制 外来机构的类定义 用括号来括起不支持的字面表达式类…

    C# 2023年5月14日
    00
  • asp.net 关于字符串内范围截取的一点方法总结

    下面是关于”asp.net 关于字符串内范围截取的一点方法总结”的完整攻略: 标题 背景介绍 在asp.net的开发中,我们经常需要对字符串进行截取操作。这个过程中涉及到字符串的长度、起始位置、截取长度等多个参数的填写。本文将对这些参数的关系进行总结,并介绍一些常见的截取操作方法,帮助读者更好地掌握字符串截取操作。 方法总结 Substring方法 stri…

    C# 2023年6月1日
    00
  • 如何在 .NET Core WebApi 中处理 MultipartFormDataContent

    最近在对某个后端服务做 .NET Core 升级时,里面使用了多处处理 MultipartFormDataContent 相关内容的代码。这些地方从 .NET Framework 迁移到 .NET Core 之后的代码改动较大,由于本身没有测试覆盖,导致在部署 QA 环境后引发了一些问题。这里做一个技术复盘。 什么是 MultipartFormDataCon…

    C# 2023年4月24日
    00
  • C# 实现窗口无边框,可拖动效果

    下面我为你详细讲解实现C#窗口无边框、可拖动效果的完整攻略。具体步骤如下: 步骤一:关闭窗口边框 为了实现窗口无边框效果,需要先关闭窗口的边框。在C#中,我们可以通过窗口的FormBorderStyle属性来实现此功能,具体方式如下: this.FormBorderStyle = FormBorderStyle.None; 步骤二:设置窗口背景透明 由于我们…

    C# 2023年6月7日
    00
合作推广
合作推广
分享本页
返回顶部