MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,广泛应用于物联网领域。本文将详细介绍如何使用C#中的MQTTnet库来实现MQTT客户端,包括连接、订阅、发布等功能,并提供完整的示例代码。
MQTTnet是一个基于C#实现的高性能MQTT客户端和服务器库,支持MQTT v3.1.1和v5.0协议版本。它提供了丰富的功能,包括同步和异步操作、消息拦截、日志记录等,适用于各种开发场景。
在开始之前,确保您的项目已安装MQTTnet库。您可以通过NuGet包管理器安装:
BashInstall-Package MQTTnet
或者使用Dotnet CLI:
Bashdotnet add package MQTTnet

首先,需要创建一个MQTT客户端实例:
C#using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
// 创建MQTT客户端工厂
var factory = new MqttFactory();
// 创建客户端
var mqttClient = factory.CreateMqttClient();
连接到MQTT服务器需要指定服务器的地址、端口以及可选的认证信息:
C#static async Task Main(string[] args)
{
// 创建MQTT客户端工厂
var factory = new MqttFactory();
// 创建客户端
var mqttClient = factory.CreateMqttClient();
// 配置客户端选项
var options = new MqttClientOptionsBuilder()
.WithClientId("ClientID") // 设置客户端ID
.WithTcpServer("broker.hivemq.com", 1883) // 设置服务器地址和端口
//.WithCredentials("username", "password") // 如果需要认证
.WithCleanSession()
.Build();
// 连接到服务器
await mqttClient.ConnectAsync(options, CancellationToken.None);
Console.WriteLine("已连接到MQTT服务器");
}

要接收特定主题的消息,需要先订阅该主题:
C#// 订阅主题 - 使用新的方式
var subscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("test/topic");
})
.Build();
await mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
Console.WriteLine("已订阅主题:test/topic");
向某个主题发布消息:
C#var message = new MqttApplicationMessageBuilder()
.WithTopic("test/topic")
.WithPayload("Hello MQTT")
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
.WithRetainFlag(false)
.Build();
await mqttClient.PublishAsync(message, CancellationToken.None);
Console.WriteLine("消息已发布");
处理接收到的消息,需要订阅ApplicationMessageReceivedHandler事件:
C#// 设置消息接收处理程序
mqttClient.ApplicationMessageReceivedAsync += e =>
{
// 使用 PayloadSegment 替代 Payload
var payload = e.ApplicationMessage.PayloadSegment.ToArray();
Console.WriteLine($"收到消息:主题={e.ApplicationMessage.Topic}, 内容={Encoding.UTF8.GetString(payload)}");
return Task.CompletedTask;
};
下面是一个完整的示例,展示了如何创建一个MQTT客户端,连接服务器,订阅主题,接收和发布消息:
C#using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
namespace MQTTnetClientExample
{
class Program
{
static async Task Main(string[] args)
{
try
{
// 创建MQTT客户端
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
// 设置消息接收处理程序
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息:主题={e.ApplicationMessage.Topic}, 内容={Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
return Task.CompletedTask;
};
// 设置连接状态改变处理程序
mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine("与服务器断开连接!");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(mqttClient.Options, CancellationToken.None);
}
catch
{
Console.WriteLine("重新连接失败!");
}
};
// 配置客户端选项
var options = new MqttClientOptionsBuilder()
.WithClientId($"ClientID_{Guid.NewGuid()}") // 使用随机ClientID
.WithTcpServer("broker.hivemq.com", 1883)
.WithCleanSession()
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.Build();
// 连接到服务器
await mqttClient.ConnectAsync(options, CancellationToken.None);
Console.WriteLine("已连接到MQTT服务器");
// 订阅主题
var subscribeOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("test/topic")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce);
})
.Build();
await mqttClient.SubscribeAsync(subscribeOptions, CancellationToken.None);
Console.WriteLine("已订阅主题:test/topic");
// 启动一个循环发送消息的任务
_ = Task.Run(async () =>
{
while (true)
{
try
{
if (mqttClient.IsConnected)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("test/topic")
.WithPayload($"Hello MQTT - {DateTime.Now}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(false)
.Build();
await mqttClient.PublishAsync(message, CancellationToken.None);
Console.WriteLine($"消息已发布 - {DateTime.Now}");
}
}
catch (Exception ex)
{
Console.WriteLine($"发送消息时出错: {ex.Message}");
}
await Task.Delay(5000); // 每5秒发送一次消息
}
});
Console.WriteLine("按下 'q' 键退出程序...");
while (Console.ReadKey().Key != ConsoleKey.Q)
{
await Task.Delay(100);
}
// 断开连接
if (mqttClient.IsConnected)
{
await mqttClient.DisconnectAsync(new MqttClientDisconnectOptions(), CancellationToken.None);
}
}
catch (Exception ex)
{
Console.WriteLine($"发生错误: {ex.Message}");
Console.WriteLine(ex.StackTrace);
}
}
}
}

MQTTnet提供了强大的功能,使得在C#中实现MQTT客户端变得简单高效。通过本文的介绍和示例代码,您可以快速搭建起MQTT客户端应用,实现连接、订阅、发布和接收消息等功能。根据实际需求,您还可以进一步探索MQTTnet提供的高级特性,如消息拦截、SSL/TLS加密、协议版本配置等。
希望本文能对您的MQTT开发有所帮助!
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!