C#用RabbitMQ实现消息订阅与发布
RabbitMQ是一个开源的消息队列系统,支持多种消息递协议。在C#中使用RabbitMQ实现消息订阅与发布非常简单,本文将详细介绍如何使用C#和RabbitMQ实现消息订阅与发布,并提供两个示例说明。
环境准备
在开始之前,需要确保已安装了以下环境:
- Visual Studio 2017 或以上版本
- RabbitMQ 服务器
安装RabbitMQ.Client
在使用C#和RabbitMQ实现消息订阅与发布之前,需要安装RabbitMQ.Client。可以通过NuGet包管理器安装RabbitMQ.Client,也可以手动下载并安装。
示例一:使用C#和RabbitMQ实现消息订阅与发布
在本例中,我们将使用C#和RabbitMQ实现消息订阅与发布。具体步骤如下:
- 创建一个发布者并发布消息。
- 创建一个订阅者并接收消息。
1. 创建一个发布者并发布消息
using RabbitMQ.Client;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
while (true)
{
Console.Write("Enter message: ");
var message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
}
在上述代码中,我们创建了一个发布者并发布了一条消息。在channel.ExchangeDeclare
方法中,我们创建了一个名为logs
的交换机,并将其类型设置为fanout
。在channel.BasicPublish
方法中,我们将消息发送到交换机中。
2. 创建一个订阅者并接收消息
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
在上述代码中,我们创建了一个订阅者并接收了一条消息。在channel.ExchangeDeclare
方法中,我们创建了一个名为logs
的交换机,并将其类型设置为fanout
。在channel.QueueDeclare
方法中,我们创建了一个随机的、独占的队列。在channel.QueueBind
方法中,我们将队列绑定到交换机上。在consumer.Received
事件中,我们处理接收到的消息。
示例二:使用C#和RabbitMQ实现RPC调用
在本例中,我们将使用C#和RabbitMQ实现RPC调用。具体步骤如下:
- 创建一个RPC客户端并发送请求。
- 创建一个RPC服务器并处理请求。
1. 创建一个RPC客户端并发送请求
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
Console.WriteLine(" [.] Got '{0}'", response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return "done";
}
public void Close()
{
connection.Close();
}
}
class Program
{
static void Main(string[] args)
{
var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
Console.ReadLine();
}
}
在上述代码中,我们创建了一个RPC客户端并发送了一条请求。在channel.QueueDeclare
方法中,我们创建了一个随机的、独占的队列。在props.CorrelationId
和props.ReplyTo
中,我们设置了请求的相关ID和回复队列。在consumer.Received
事件中,我们处理接收到的回复。
2. 创建一个RPC服务器并处理请求
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class RpcServer
{
private readonly IConnection connection;
private readonly IModel channel;
public RpcServer()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
var response = Fib(n).ToString();
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
channel.BasicPublish(exchange: "",
routingKey: ea.BasicProperties.ReplyTo,
basicProperties: replyProps,
body: Encoding.UTF8.GetBytes(response));
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: "rpc_queue",
autoAck: false,
consumer: consumer);
}
public void Start()
{
Console.WriteLine(" [x] Awaiting RPC requests");
}
public void Close()
{
connection.Close();
}
private static int Fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}
}
class Program
{
static void Main(string[] args)
{
var rpcServer = new RpcServer();
rpcServer.Start();
Console.ReadLine();
rpcServer.Close();
}
}
在上述代码中,我们创建了一个RPC服务器并处理了一条请求。在channel.QueueDeclare
方法中,我们创建了一个名为rpc_queue
的队列。在consumer.Received
事件中,我们处理接收到的请求,并将回复发送到回复队列中。在channel.BasicConsume
方法中,我们注册了一个消费者,用于处理接收到的请求。
总结
本文介绍了如何使用C#和RabbitMQ实现消息订阅与发布和RPC调用,并提供了两个示例说明。RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C#用RabbitMQ实现消息订阅与发布 - Python技术站