编辑
2025-09-17
C#
00

目录

引言
准备工作
安装RabbitMQ.Client
RabbitMQ基本概念
建立连接
消息发布
直接交换机发布消息
消息消费
队列绑定与消息接收
交换机类型详解
直接交换机(Direct Exchange)
主题交换机(Topic Exchange)
扇出交换机(Fanout Exchange)
高级特性
消息持久化
消息确认
最佳实践
结语

引言

在现代分布式系统中,消息队列扮演着至关重要的角色。RabbitMQ作为开源消息代理软件,凭借其高性能、可靠性和灵活性,成为开发者首选的消息中间件。本文将深入探讨如何在C#中使用RabbitMQ.Client,帮助你构建高效、可靠的消息传递系统。

准备工作

安装RabbitMQ.Client

在开始之前,你需要安装RabbitMQ.Client NuGet包。有两种方式:

  1. 在Visual Studio的NuGet包管理器中搜索并安装"RabbitMQ.Client"
  2. 在包管理器控制台运行命令:
C#
Install-Package RabbitMQ.Client

RabbitMQ基本概念

在深入代码前,让我们了解几个关键概念:

  • 连接(Connection):与RabbitMQ服务器的TCP连接
  • 通道(Channel):连接内的虚拟连接,用于执行大多数操作
  • 交换机(Exchange):接收并路由消息的核心组件
  • 队列(Queue):存储等待被消费的消息缓冲区
  • 绑定(Binding):定义交换机和队列间的关系
  • 路由键(Routing Key):决定消息路由方式的关键标识

建立连接

以下是连接RabbitMQ服务器的标准代码:

C#
using RabbitMQ.Client; namespace AppRabbitMQ { internal class Program { static async Task Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); Console.ReadKey(); } } }

消息发布

直接交换机发布消息

C#
using System; using System.Text; using System.Threading; using RabbitMQ.Client; namespace AppRabbitMQ { internal class Program { static async Task Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); string exchangeName = "my_exchange"; string routingKey = "my_routing_key"; string message = "Hello, RabbitMQ!"; // 声明直接交换机 await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct); // 发布消息 var body = Encoding.UTF8.GetBytes(message); await channel.BasicPublishAsync( exchange: exchangeName, routingKey: routingKey, mandatory: true, body: body ); Console.WriteLine($"发送消息: {message}"); Console.ReadKey(); } } }

image.png

image.png

消息消费

队列绑定与消息接收

C#
using System; using System.Security.AccessControl; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace AppRabbitMQ { internal class Program { static async Task Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); string queueName = "my_queue"; string exchangeName = "my_exchange"; string routingKey = "my_routing_key"; // 声明队列 await channel.QueueDeclareAsync( queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null ); // 绑定队列到交换机 await channel.QueueBindAsync( queue: queueName, exchange: exchangeName, routingKey: routingKey ); // 创建消费者 var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接收到消息: {message}"); return Task.CompletedTask; }; // 开始消费 await channel.BasicConsumeAsync( queue: queueName, autoAck: true, consumer: consumer ); Console.ReadKey(); } } }

image.png

交换机类型详解

直接交换机(Direct Exchange)

精确匹配路由键,适用于单一目的路由。

C#
channel.ExchangeDeclareAsync("direct_exchange", ExchangeType.Direct);

主题交换机(Topic Exchange)

支持通配符路由,灵活性更高。

C#
channel.ExchangeDeclareAsync("topic_exchange", ExchangeType.Topic); // 发布消息到不同主题 channel.BasicPublishAsync("topic_exchange", "usa.news", Encoding.UTF8.GetBytes("US News")); channel.BasicPublishAsync("topic_exchange", "europe.weather", Encoding.UTF8.GetBytes("Europe Weather")); // 绑定队列,使用通配符 channel.QueueBindAsync("news_queue", "topic_exchange", "*.news"); channel.QueueBindAsync("weather_queue", "topic_exchange", "#.weather");

扇出交换机(Fanout Exchange)

广播消息到所有绑定队列,忽略路由键。

C#
channel.ExchangeDeclareAsync("fanout_exchange", ExchangeType.Fanout); // 广播消息 channel.BasicPublishAsync("fanout_exchange", "", Encoding.UTF8.GetBytes("广播消息")); // 多个队列绑定 channel.QueueBindAsync("queue1", "fanout_exchange", ""); channel.QueueBindAsync("queue2", "fanout_exchange", "");

高级特性

消息持久化

确保消息在服务器重启后不丢失:

C#
var properties = new BasicProperties { Persistent = true }; await channel.BasicPublishAsync( exchange: "my_durable_exchange", routingKey: "my_routing_key", basicProperties: properties, body: Encoding.UTF8.GetBytes("持久化消息"), mandatory: false );

消息确认

使用消息确认机制保证消息处理:

C#
channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); // 处理消息... // 手动确认 channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false); return Task.CompletedTask; };

最佳实践

  1. 重用连接,避免频繁创建
  2. 并发操作时,每个线程使用独立通道
  3. 实现异常处理和重试机制
  4. 使用JSON等序列化复杂对象
  5. 添加日志记录和监控

结语

RabbitMQ.Client为C#开发者提供了强大的消息传递能力。通过本指南,你已经掌握了RabbitMQ的基本使用技巧。根据具体业务场景,灵活运用这些技术,你可以构建高性能、可靠的分布式系统。

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!