编辑
2025-09-30
C#
00

目录

RabbitMQ
RabbitMQ的交换机类型共有四种
Fanout Exchange(扇型交换机)
Topic Exchange(主题交换机)
Headers Exchange(头交换机)
下载与安装
查看安装信息
用户管理
工作模式
simple (简单模式)
Work queues(工作模式)
Publish/Subscribe(发布订阅模式)
Routing(路由模式)
Topics(主题模式)
开发
简单模式
删除交换机
fanout 广播

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

image.png

RabbitMQ的交换机类型共有四种

直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的能力。

image.png

这种配置下,我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。

Fanout Exchange(扇型交换机)

image.png

Fanout Exchange(扇型交换机):当一个Msg发送到扇形交换机X上时,则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。

Topic Exchange(主题交换机)

image.png

路由键和绑定键命名

消息路由键—发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节,词的长度由你来定。

绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:

*(星号) 能够替代一个单词。

#(井号) 能够替代零个或多个单词。

  • Q1针对所有的橘色orange动物。
  • Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。

Headers Exchange(头交换机)

头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 在头交换机里有一个特别的参数”x-match”,当”x-match”的值为“any”时,只需要消息头的任意一个值匹配成功即可,当”x-match”值为“all”时,要求消息头的所有值都需相等才可匹配成功。

下载与安装

Messaging that just works — RabbitMQ 下载

image.png

下载

image.png

安装时需要先安装Erlang

image.png 安装Erlang

image.png

安装完成后,再安装Rabbitmq

image.png

基本就是下一步这样就可以了。

image.png

在服务中可以看到

image.png

查看安装信息

image.png

rabbitmq-plugins list

这里可以看到安装的插件

image.png

我们用下列命令安装 rabbitmq_management 插件,这款插件是可以可视化的方式查看 RabbitMQ 服务器实例的状态,以及操控 RabbitMQ 服务器。

rabbitmq-plugins enable rabbitmq_management

image.png

在浏览器中输入:http://localhost:15672 可以看到一个登录界面:

image.png

image.png

用户管理

注意:在windows下安装后rabbitmqctl 命令会出错

复制 C:\Windows\System32\config\systemprofile.erlang.cookie 到 C:\Users\xxx.erlang.cookie

rabbitmqctl list_users 查看注册用户

image.png

创建一个用户,rabbit1,密码123456

rabbitmqctl add_user rabbit1 123456

有5个 tag 可供选择,分别是:administrator ,monitoring,policymaker,management 和 none,其实这里的 tag 代表的是权限,administrator 是最高权限,none 表示不能访问,这里 administrator 和 none 的组合,权限应该是向高看齐,忽略 none,用的是 administrator 的权限。

rabbitmqctl set_user_tags rabbit1 administrator

工作模式

simple (简单模式)

image.png

一个消费者消费一个生产者生产的信息

Work queues(工作模式)

image.png

一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次

Publish/Subscribe(发布订阅模式)

image.png

生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息

Routing(路由模式)

age.png?auth_key=1759179615-83y7kMTpBUYrRjDxKHBtqP-0-01eefda806e89718d709853ee90c7c67)

image.png 生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列

Topics(主题模式)

image.png

生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者

开发

安装RabbitMQ.Client 在nuget中

image.png

简单模式

发送

C#
private void btnSend_Click(object sender, EventArgs e) { var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672//默认端口 }; using (var conn = factory.CreateConnection()) using (var channel = conn.CreateModel()) { channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null ); string msg = "hello world " + DateTime.Now.ToString("yyyyMMddHHmmss"); var body = System.Text.Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "", routingKey: "hello", basicProperties: null, body: body ); } }

接收

C#
IConnection conn; IModel channel; EventingBasicConsumer consumer; public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672//默认端口 }; conn = factory.CreateConnection(); channel = conn.CreateModel(); channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null ); consumer = new EventingBasicConsumer(channel); consumer.Received += (obj, e) => { var body = e.Body; var msg = System.Text.Encoding.UTF8.GetString(body.ToArray()); this.Invoke(() => { txtReceive.AppendText(msg); txtReceive.AppendText(System.Environment.NewLine); }); }; channel.BasicConsume( queue: "hello", autoAck: true, consumer: consumer ); } private void btnClose_Click(object sender, EventArgs e) { conn.Close(); }

image.png

注意

  • connectionfactory:构造一个实例,主要创建连接。
  • iconnection:表示一个基于amqp协议的连接。
  • imodel:表示一个rabbitmq通道,可用于声明一个队列,然后开始消费。
  • eventingbasicconsumer:基于独立事件监听的基础消费者,可以监听并接收消息。
  • 生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息
  • 消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

删除交换机

C#
channel.ExchangeDelete("test");

fanout 广播

C#
private void btnSend_Click(object sender, EventArgs e) { var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672//默认端口 }; using (var conn = factory.CreateConnection()) using (var channel = conn.CreateModel()) { //目前交换器类型有这几种:direct,topic,headers和fanout channel.ExchangeDeclare("test", "fanout"); //只需定义exchange channel.QueueDeclare( durable: false, exclusive: false, autoDelete: false, arguments: null ); string msg = "hello world " + DateTime.Now.ToString("yyyyMMddHHmmss"); var body = System.Text.Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "test", routingKey: "", basicProperties: null, body: body ); } }

接收

C#
private void Form1_Load(object sender, EventArgs e) { var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672//默认端口 }; conn = factory.CreateConnection(); channel = conn.CreateModel(); channel.ExchangeDeclare("test", "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind( queue: queueName, exchange: "test", routingKey:"", arguments:null ); consumer = new EventingBasicConsumer(channel); consumer.Received += (obj, e) => { var body = e.Body; var msg = System.Text.Encoding.UTF8.GetString(body.ToArray()); this.Invoke(() => { txtReceive.AppendText(msg); txtReceive.AppendText(System.Environment.NewLine); }); }; channel.BasicConsume( queue: queueName, autoAck: true, consumer: consumer ); }

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!