RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的能力。
这种配置下,我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。
Fanout Exchange(扇型交换机):当一个Msg发送到扇形交换机X上时,则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。
路由键和绑定键命名
消息路由键—发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节,词的长度由你来定。
绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:
*(星号) 能够替代一个单词。
#(井号) 能够替代零个或多个单词。
头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 在头交换机里有一个特别的参数”x-match”,当”x-match”的值为“any”时,只需要消息头的任意一个值匹配成功即可,当”x-match”值为“all”时,要求消息头的所有值都需相等才可匹配成功。
Messaging that just works — RabbitMQ 下载
下载
安装时需要先安装Erlang
安装Erlang
安装完成后,再安装Rabbitmq
基本就是下一步这样就可以了。
在服务中可以看到
rabbitmq-plugins list
这里可以看到安装的插件
我们用下列命令安装 rabbitmq_management 插件,这款插件是可以可视化的方式查看 RabbitMQ 服务器实例的状态,以及操控 RabbitMQ 服务器。
rabbitmq-plugins enable rabbitmq_management
在浏览器中输入:http://localhost:15672 可以看到一个登录界面:
注意:在windows下安装后rabbitmqctl 命令会出错
复制 C:\Windows\System32\config\systemprofile.erlang.cookie 到 C:\Users\xxx.erlang.cookie
rabbitmqctl list_users 查看注册用户
创建一个用户,rabbit1,密码123456
rabbitmqctl add_user rabbit1 123456
有5个 tag 可供选择,分别是:administrator ,monitoring,policymaker,management 和 none,其实这里的 tag 代表的是权限,administrator 是最高权限,none 表示不能访问,这里 administrator 和 none 的组合,权限应该是向高看齐,忽略 none,用的是 administrator 的权限。
rabbitmqctl set_user_tags rabbit1 administrator
一个消费者消费一个生产者生产的信息
一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次
生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息
age.png?auth_key=1759179615-83y7kMTpBUYrRjDxKHBtqP-0-01eefda806e89718d709853ee90c7c67)
生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列
生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者
安装RabbitMQ.Client 在nuget中
发送
C#private void btnSend_Click(object sender, EventArgs e)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672//默认端口
};
using (var conn = factory.CreateConnection())
using (var channel = conn.CreateModel())
{
channel.QueueDeclare(
queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
string msg = "hello world " + DateTime.Now.ToString("yyyyMMddHHmmss");
var body = System.Text.Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(
exchange: "",
routingKey: "hello",
basicProperties: null,
body: body
);
}
}
接收
C#IConnection conn;
IModel channel;
EventingBasicConsumer consumer;
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672//默认端口
};
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.QueueDeclare(
queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, e) =>
{
var body = e.Body;
var msg = System.Text.Encoding.UTF8.GetString(body.ToArray());
this.Invoke(() =>
{
txtReceive.AppendText(msg);
txtReceive.AppendText(System.Environment.NewLine);
});
};
channel.BasicConsume(
queue: "hello",
autoAck: true,
consumer: consumer
);
}
private void btnClose_Click(object sender, EventArgs e)
{
conn.Close();
}
注意
C#channel.ExchangeDelete("test");
C#private void btnSend_Click(object sender, EventArgs e)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672//默认端口
};
using (var conn = factory.CreateConnection())
using (var channel = conn.CreateModel())
{
//目前交换器类型有这几种:direct,topic,headers和fanout
channel.ExchangeDeclare("test", "fanout"); //只需定义exchange
channel.QueueDeclare(
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
string msg = "hello world " + DateTime.Now.ToString("yyyyMMddHHmmss");
var body = System.Text.Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(
exchange: "test",
routingKey: "",
basicProperties: null,
body: body
);
}
}
接收
C#private void Form1_Load(object sender, EventArgs e)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672//默认端口
};
conn = factory.CreateConnection();
channel = conn.CreateModel();
channel.ExchangeDeclare("test", "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(
queue: queueName,
exchange: "test",
routingKey:"",
arguments:null
);
consumer = new EventingBasicConsumer(channel);
consumer.Received += (obj, e) =>
{
var body = e.Body;
var msg = System.Text.Encoding.UTF8.GetString(body.ToArray());
this.Invoke(() =>
{
txtReceive.AppendText(msg);
txtReceive.AppendText(System.Environment.NewLine);
});
};
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer
);
}
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!