RabbitMQ .NET消息队列使用详解
RabbitMQ是一个功能强大的消息队列系统,支持多种消息协议。在本文中,我们将介绍如何使用RabbitMQ .NET客户端库在.NET应用程序中使用消息队列,并提供两个示例说明。
环境准备
在开始之前,需要确保已安装了以下环境:
- RabbitMQ服务器
- .NET Core SDK
步骤一:安装RabbitMQ .NET客户端库
在本步骤中,我们将安装RabbitMQ .NET客户端库,用于在.NET应用程序中使用消息队列。
dotnet add package RabbitMQ.Client
在上述命令中,我们使用.NET Core CLI安装了RabbitMQ .NET客户端库。
步骤二:创建生产者
在本步骤中,我们将创建一个生产者,用于将消息发送到队列中。
using System;
using RabbitMQ.Client;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = System.Text.Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
在上述代码中,我们创建了一个连接工厂,并使用它创建了一个连接和一个通道。在channel.QueueDeclare
方法中,我们声明了一个名为hello
的队列。在channel.BasicPublish
方法中,我们将消息发送到队列中。
步骤三:创建消费者
在本步骤中,我们将创建一个消费者,用于从队列中接收消息。
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
在上述代码中,我们创建了一个连接工厂,并使用它创建了一个连接和一个通道。在channel.QueueDeclare
方法中,我们声明了一个名为hello
的队列。在consumer.Received
事件中,我们处理接收到的消息。在channel.BasicConsume
方法中,我们开始消费队列中的消息。
示例一:使用RabbitMQ .NET客户端库实现消息订阅与发布
在本例中,我们将使用RabbitMQ .NET客户端库实现消息订阅与发布。具体步骤如下:
- 创建一个发布者并发布消息。
- 创建一个订阅者并接收消息。
1. 创建一个发布者并发布消息
using System;
using RabbitMQ.Client;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
while (true)
{
string message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
}
在上述代码中,我们创建了一个发布者并发布了一条消息。在channel.ExchangeDeclare
方法中,我们创建了一个名为logs
的交换机,并将其类型设置为fanout
。在channel.BasicPublish
方法中,我们将消息发送到交换机中。
2. 创建一个订阅者并接收消息
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
在上述代码中,我们创建了一个订阅者并接收了一条消息。在channel.ExchangeDeclare
方法中,我们创建了一个名为logs
的交换机,并将其类型设置为fanout
。在channel.QueueDeclare
方法中,我们创建了一个随机的、独占的队列。在channel.QueueBind
方法中,我们将队列绑定到交换机上。在consumer.Received
事件中,我们处理接收到的消息。
示例二:使用RabbitMQ .NET客户端库实现RPC调用
在本例中,我们将使用RabbitMQ .NET客户端库实现RPC调用。具体步骤如下:
- 创建一个RPC客户端并发送请求。
- 创建一个RPC服务器并处理请求。
1. 创建一个RPC客户端并发送请求
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
Console.WriteLine(" [.] Got '{0}'", response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return "done";
}
public void Close()
{
connection.Close();
}
}
class Program
{
static void Main(string[] args)
{
var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
在上述代码中,我们创建了一个RPC客户端并发送了一条请求。在channel.QueueDeclare
方法中,我们创建了一个随机的、独占的队列。在props.CorrelationId
和props.ReplyTo
属性中,我们设置了请求的相关ID和回复队列。在consumer.Received
事件中,我们处理接收到的回复。在channel.BasicPublish
和channel.BasicConsume
方法中,我们发送请求并等待回复。
2. 创建一个RPC服务器并处理请求
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class RpcServer
{
private readonly IConnection connection;
private readonly IModel channel;
public RpcServer()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
var response = fib(n).ToString();
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
channel.BasicPublish(exchange: "",
routingKey: ea.BasicProperties.ReplyTo,
basicProperties: replyProps,
body: Encoding.UTF8.GetBytes(response));
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: "rpc_queue",
autoAck: false,
consumer: consumer);
}
public void Close()
{
connection.Close();
}
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
class Program
{
static void Main(string[] args)
{
var rpcServer = new RpcServer();
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
rpcServer.Close();
}
}
在上述代码中,我们创建了一个RPC服务器并处理了一条请求。在channel.QueueDeclare
方法中,我们创建了一个名为rpc_queue
的队列。在consumer.Received
事件中,我们处理接收到的请求,并将回复发送到回复队列中。
总结
本文介绍了如何使用RabbitMQ .NET客户端库在.NET应用程序中使用消息队列,并提供了两个示例说明。RabbitMQ是一个功能强大的消息队列系统,可以用于构建高可用性、高性能的分布式应用程序。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RabbitMQ .NET消息队列使用详解 - Python技术站