如何用C#实现SAGA分布式事务

当我们进行分布式系统开发的时候,需要保证不同分布式系统节点之间的数据的一致性,同时对于不同节点的事务处理也需要保证原子性、一致性、持久性和隔离性。SAGA是IBM公司出品的分布式事务解决方案,主要通过补偿机制来保证事务的一致性,因此最近比较火热。下面我们就来详细讲解如何使用C#实现SAGA分布式事务。

一、什么是SAGA分布式事务?

SAGA是分布式事务的一种解决方案,通过基于局部事务的补偿机制来保证分布式事务的一致性。SAGA模式中,将复杂的分布式事务分为一系列局部事务,这些局部事务通过相互协调来实现分布式事务的最终一致性。

二、SAGA分布式事务的实现流程

SAGA分布式事务的实现流程主要包括以下几个步骤:

  1. 业务流程发起方发起分布式事务;
  2. 业务流程发起方调用Saga Orchestrator,下发事务指令;
  3. 相应的业务服务通过实现CompensatableAction接口,实现事务执行中的“原操作”和“补偿操作”;
  4. 一旦事务中的一个步骤发生异常,Saga Orchestrator会自动回滚并执行相应的补偿操作;
  5. 如果整个事务成功执行地话,Saga Orchestrator会执行最后的完成操作。

三、SAGA分布式事务实现过程示例

接下来,我们将基于C#,使用.NET Core框架,来演示如何使用SAGA分布式事务。

在这个演示中,我们将使用EF Core示例,并且将基于NATS同时还演示了一个异步的方式实现SAGA分布式事务。

示例:

using Microsoft.EntityFrameworkCore;
using NATS.Client;
using System;
using System.Threading.Tasks;

// 定义IAccountService接口,具有创建账户和支付操作
public interface IAccountService
{
    public Task CreateAccountAsync(Account account);
    public Task DeductAsync(DeductRequest request);
}

// 定义一个NATS发送者
public interface IPublisher
{
    public Task PublishAsync(string subject, string msg);
}

// AccountService是一个具体的实现IAccountService接口的类,包括CreateAccountAsync和DeductAsync方法的具体实现
public class AccountService : IAccountService
{
    private readonly AccountDbContext _context;
    private readonly IPublisher _publisher;
    public AccountService(AccountDbContext context, IPublisher publisher)
    {
        _context = context;
        _publisher = publisher;
    }

    public async Task CreateAccountAsync(Account account)
    {
        // 创建账户
        await _context.Accounts.AddAsync(account);
        await _context.SaveChangesAsync();
    }

    public async Task DeductAsync(DeductRequest request)
    {
        // 扣款流程
        var account = await _context.Accounts.FirstOrDefaultAsync(a => a.Id == request.AccountId);
        if (account == null)
        {
            throw new InvalidOperationException("Account not found");
        }

        if (account.Balance < request.Amount)
        {
            throw new InvalidOperationException("Insufficient balance");
        }

        account.Balance = account.Balance - request.Amount;
        _context.Accounts.Update(account);
        await _context.SaveChangesAsync();

        // 向下一步流程的处理器发送消息
        await _publisher.PublishAsync("order.create", $"{{\"OrderId\": {request.OrderId}}}");
    }

}

// AccountDbContext 是使用EF Core定义的数据库上下文
public class AccountDbContext : DbContext
{
    public DbSet<Account> Accounts { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder.UseSqlServer(@"Server=localhost;Database=Account;User ID=sa;Password=123456;");
    }
}


// NATSPublisher 是一个实现了IPublisher接口的NATS消息发送者类
public class NATSPublisher : IPublisher
{
    private readonly IConnection _nats;
    public NATSPublisher()
    {
        _nats = new ConnectionFactory().CreateConnection();
    }

    public async Task PublishAsync(string subject, string msg)
    {
        await Task.Run(() => _nats.Publish(subject, System.Text.Encoding.UTF8.GetBytes(msg)));
    }
}


// 定义具体的补偿操作类,并且继承CompensatableAction接口
class DeductCompensationAction : CompensatableAction
{
    private readonly IAccountService _accountService;
    private readonly DeductRequest _deductRequest;

    public DeductCompensationAction(IAccountService accountService, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _deductRequest = deductRequest;
    }

    public async Task Compensate()
    {
        // 撤销扣款操作
        await _accountService.DeductAsync(new DeductRequest()
        {
            AccountId = _deductRequest.AccountId,
            Amount = -_deductRequest.Amount,
            OrderId = _deductRequest.OrderId
        });
    }

    public async Task Perform()
    {
        // 执行扣款操作
        await _accountService.DeductAsync(_deductRequest);
    }
}

// 定义DeductRequest类,用于记录并提交扣款请求
class DeductRequest
{
    public int AccountId { get; set; }
    public decimal Amount { get; set; }
    public int OrderId { get; set; }
}


// Account类对应于账户
public class Account
{
    public int Id { get; set; }
    public string Name { get; set; }
    public decimal Balance { get; set; }
}

// 下面是使用SAGA模式的业务流程类
public class CreateAccountAndDeductProcess
{
    private readonly IAccountService _accountService;
    private readonly IPublisher _publisher;
    private readonly DeductRequest _deductRequest;

    public CreateAccountAndDeductProcess(IAccountService accountService, IPublisher publisher, DeductRequest deductRequest)
    {
        _accountService = accountService;
        _publisher = publisher;
        _deductRequest = deductRequest;
    }

    // 使用SagaStep特性来进行流程定义
    [SagaStep]
    public async Task CreateAccount()
    {
        await _accountService.CreateAccountAsync(new Account()
        {
            Name = $"Account_{Guid.NewGuid()}",
            Balance = _deductRequest.Amount
        });

        // 发布第二步流程的消息
        await _publisher.PublishAsync("deduct.account", $"{{\"AccountId\": {1},\"OrderId\" : {_deductRequest.OrderId}, \"Amount\" : {_deductRequest.Amount}}}");
    }

    [SagaStep(IsCompensable = true)]
    public async Task DeductAccount()
    {
        // 发布扣款消息
        await _accountService.DeductAsync(_deductRequest);
    }

    // 定义回滚操作
    [CompensationAction]
    public async Task RollbackDeductAccount()
    {
        var compensationAction = new DeductCompensationAction(_accountService, _deductRequest);
        await compensationAction.Compensate();
    }
}

// SagaOrchestrator类提供了流程的发起和处理能力
public class SagaOrchestrator
{
    private readonly ISubscription _subscription;
    private readonly IAccountService _accountService;

    public SagaOrchestrator(ISubscription subscription, IAccountService accountService)
    {
        _subscription = subscription;
        _accountService = accountService;
    }

    public async Task Run()
    {
        Console.WriteLine("Saga Orchestrator started");

        while (true)
        {
            Message msg = _subscription.NextMessage();
            var subject = msg.Subject;
            var data = System.Text.Encoding.UTF8.GetString(msg.Data);

            Console.WriteLine($"{subject}: {data}");

            switch (subject)
            {
                // 开始订单流程
                case "order.create":
                    await CreateAccountAndDeductProcessRunner(data);
                    break;
                // 建立账户并扣款
                case "deduct.account":
                    await DeductAccountProcessRunner(data);
                    break;
                default:
                    break;
            }

            await Task.Delay(50);
        }
    }

    private async Task CreateAccountAndDeductProcessRunner(string data)
    {
        Console.WriteLine("Create Account And Deduct Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }

    private async Task DeductAccountProcessRunner(string data)
    {
        Console.WriteLine("Deduct Account Process started");

        var deductRequest = Newtonsoft.Json.JsonConvert.DeserializeObject<DeductRequest>(data);

        var process = new CreateAccountAndDeductProcess(_accountService, new NATSPublisher(), deductRequest);
        var sagaBuilder = new SagaBuilder<CreateAccountAndDeductProcess>(process);

        await sagaBuilder
            .Invoke(x => x.CreateAccount)
            .Invoke(x => x.DeductAccount, x => x.RollbackDeductAccount)
            .Build()
            .RunAsync();
    }
}

// 最后,我们来演示如何运行上述代码

static async Task Main(string[] args)
{
    var factory = new ConnectionFactory();
    // 建立一个连接
    IConnection nats = await Task.Run(() => factory.CreateConnection());
    // 订阅主题
    ISubscription subscription = await Task.Run(() =>
        nats.SubscribeAsync(">", (sender, args) =>
        {
            Console.WriteLine("*************************");
            Console.WriteLine(args.Message.Data);
        }));

    // 创建一个DbContext,并且初始化数据库
    using (var db = new AccountDbContext())
    {
        await db.Database.EnsureDeletedAsync();
        await db.Database.EnsureCreatedAsync();
    }

    // 初始化NATS消息发送
    var publisher = new NATSPublisher();

    // 初始化业务service
    var accountService = new AccountService(new AccountDbContext(), publisher);

    // 运行saga orchestrator
    var so = new SagaOrchestrator(subscription, accountService);
    await so.Run();
}

在上面的代码中,我们演示了一个基于.NET Core的EF Core的SAGA分布式事务的示例。在具体实现中,分别定义了账户服务、账户上下文,NATS发送者,扣款请求、账户及NATSPublisher实现,定义具体的补偿回滚操作类,定义创建账户和扣款的具体流程。最后,我们在Main方法中初始化DbContext和NATS消息发送器,运行saga orchestrator,即可进行分布式事务的运行。

建议在编写此类代码时,需要注意以下几点:

  1. 需要特别注意所有局部事务之间的依赖关系,避免出现数据依赖错误;
  2. 注意消息在事件传递过程中的状态,避免出现重复或者错误的消息;
  3. 要对分布式事务中各种错误进行充分的测试,以确保系统在各种情况下都能够正常运行。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何用C#实现SAGA分布式事务 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • ASP.NET Core与NLog集成的完整步骤

    NLog 是一个流行的日志记录框架,可以帮助我们记录应用程序的日志。在 ASP.NET Core 中,可以使用 NLog 来记录日志。以下是 ASP.NET Core 与 NLog 集成的完整步骤: 步骤一:安装 NLog 包 在 ASP.NET Core 中,可以使用 NuGet 包管理器来安装 NLog 包。可以在项目的根目录下的命令行中使用以下命令来安…

    C# 2023年5月17日
    00
  • C#实现上位机与欧姆龙PLC通讯(FINS)

    C#实现上位机与欧姆龙PLC通讯(FINS)的完整攻略 背景介绍 欧姆龙PLC是一种常见的现场控制设备,与上位机进行通讯可以实现对PLC控制的监管和控制。而C#作为一种常见的编程语言,也可以用来实现上位机和PLC的通讯。本文将介绍如何使用C#实现上位机和欧姆龙PLC的通讯。 实现步骤 创建C#项目 在Visual Studio中创建一个C#项目。 导入Omr…

    C# 2023年5月15日
    00
  • Unity实现单机游戏每日签到系统

    下面我将详细讲解“Unity实现单机游戏每日签到系统”的完整攻略。本文将分为以下三个部分来进行讲解: 准备工作 实现过程 示例说明 准备工作 在开始实现签到系统之前,您需要准备以下工作: Unity开发环境:Unity是一款跨平台的游戏引擎,您需要提前安装好Unity并且熟悉Unity的基础操作。 程序代码:签到系统的核心是代码实现,您需要根据自己的游戏需求…

    C# 2023年6月1日
    00
  • C#中使用HttpPost调用WebService的方法

    下面是C#中使用HttpPost调用WebService的方法的完整攻略。 步骤一:添加引用和命名空间 在使用HttpPost调用WebService之前,我们需要添加相应的引用和命名空间。在Visual Studio中,右键单击项目名称,选择“添加引用”,添加System.Net和System.Web.Services引用。在代码文件中,我们需要使用以下命…

    C# 2023年5月15日
    00
  • C# Linq的Distinct()方法 – 返回序列中不同的元素

    当我们需要在C#中从一个集合中筛选出不同的元素,Linq的Distinct()方法就非常适用了。在这里,我将为您提供C#Linq的Distinct()方法的完整攻略,包括定义、返回值、语法、使用方法和示例。 定义 Distinct()方法是Linq用于从集合中返回不同元素的方法之一。该方法基于对象的值,比较并取消重复出现的元素。不同于其他返回元素的方法,Di…

    C# 2023年4月19日
    00
  • C#使用正则表达式过滤html标签

    下面是使用C#过滤html标签的完整攻略。 1. 正则表达式 我们知道,HTML标签的特点是以<开头,以>结尾,并且中间可能会有一些属性,例如<div class=”my-class”>。为了过滤掉HTML标签,我们可以使用正则表达式,其中最基础的正则表达式如下: <[^>]+> 这个表达式表示匹配所有以<开头…

    C# 2023年6月7日
    00
  • 用序列化实现List 实例的深复制(推荐)

    使用序列化实现List实例的深复制可以保证复制后的实例与原实例完全独立而不会相互影响。下面是使用序列化实现List实例深复制的详细攻略: 什么是深复制 深复制是指复制对象时,每个对象都会被单独复制一份,这两份对象完全独立而相互没有影响。这与浅复制不同,浅复制只是把对象的引用复制一份,这样两个对象会共用同一个引用,从而相互影响。 使用序列化实现深复制 针对Li…

    C# 2023年5月31日
    00
  • 关于C#中yield return用法的思考

    关于C#中yield return用法的完整攻略如下: 1. 什么是yield return yield return 是C#中的迭代器语法。简单来说,它允许我们一次性返回一组值的序列,而不需要在内存中维护它们的列表。 C# 的 yield 关键字使得我们可以定义一种流式处理的方式。被yield 关键字所标记的方法返回一个IEnumerable 接口对象,使…

    C# 2023年6月6日
    00
合作推广
合作推广
分享本页
返回顶部