Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件
場景
MQTT
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基于發布/訂閱(publish/subscribe)模式的"輕量級"通訊協議,該協議構建于TCP/IP協議上,由IBM在1999年發布。MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基于客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易于實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
MQTTnet
MQTTnet 是一個基于 MQTT 通信的高性能 .NET 開源庫,它同時支持 MQTT 服務器端和客戶端。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。
實現
打開VS,新建mqtt的服務端項目
選擇將解決方案和項目放在同一個目錄下
然后選擇新建控制臺應用程序,并且選擇目標框架為.NET Core3.1
然后在此解決方案下右擊新建項目
新建Winform項目,作為Mqtt的客戶端
然后在解決方案上右擊選擇-管理解決方案的Nuget 程序包
然后在瀏覽中搜索MQTTnet,右邊勾選需要添加依賴的項目,這里客戶端和服務端都需要添加。然后下面選擇指定版本,這里是2.4.0,點擊安裝
安裝過程中會提示預覽更改,點擊確定,以及會提示接受協議。
安裝成功后,就可以在項目下看到依賴項
然后編寫服務端的代碼,打開Program.cs,修改如下
using System; using MQTTnet; using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Protocol; using MQTTnet.Core.Server; using System.Text; using System.Threading;namespace MqttnetServer {class Program{//MQtt服務端private static MqttServer mqttServer = null;static void Main(string[] args){//MQTTnet 提供了一個靜態類 MqttNetTrace 來對消息進行跟蹤//MqttNetTrace 的事件 TraceMessagePublished 用于跟蹤服務端和客戶端應用的日志消息,比如啟動、停止、心跳、消息訂閱和發布等MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;//啟動服務端new Thread(StartMqttServer).Start();while (true){//獲取輸入字符var inputString = Console.ReadLine().ToLower().Trim();//exit則停止服務if (inputString == "exit"){mqttServer.StopAsync();Console.WriteLine("MQTT服務已停止!");break;}//clients則輸出所有客戶端else if (inputString == "clients"){foreach (var item in mqttServer.GetConnectedClients()){Console.WriteLine($"客戶端標識:{item.ClientId},協議版本:{item.ProtocolVersion}");}}else{Console.WriteLine($"命令[{inputString}]無效!");}}}//啟動服務端private static void StartMqttServer(){if (mqttServer == null){try{//在 MqttServerOptions 選項中,你可以使用 ConnectionValidator 來對客戶端連接進行驗證。//比如客戶端ID標識 ClientId,用戶名 Username 和密碼 Password 等。var options = new MqttServerOptions{ConnectionValidator = p =>{if (p.ClientId == "c001"){if (p.Username != "u001" || p.Password != "p001"){return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;}}return MqttConnectReturnCode.ConnectionAccepted;}};//創建服務端最簡單的方式是采用 MqttServerFactory 對象的 CreateMqttServer 方法來實現,該方法需要一個//MqttServerOptions 參數。mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;//服務端支持 ClientConnected、ClientDisconnected 和 ApplicationMessageReceived 事件,//分別用來檢查客戶端連接、客戶端斷開以及接收客戶端發來的消息。//ApplicationMessageReceived 的事件參數包含了客戶端ID標識 ClientId 和 MQTT 應用消息 MqttApplicationMessage 對象,//通過該對象可以獲取主題 Topic、QoS QualityOfServiceLevel 和消息內容 Payload 等信息mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;//ClientConnected 和 ClientDisconnected 事件的事件參數一個客戶端連接對象 ConnectedMqttClient//通過該對象可以獲取客戶端ID標識 ClientId 和 MQTT 版本 ProtocolVersion。mqttServer.ClientConnected += MqttServer_ClientConnected;mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;}catch (Exception ex){Console.WriteLine(ex.Message); return;}}//創建了一個 IMqttServer 對象后,調用其 StartAsync 方法即可啟動 MQTT 服務mqttServer.StartAsync();Console.WriteLine("MQTT服務啟動成功!");}//ClientConnected 和 ClientDisconnected 事件的事件參數一個客戶端連接對象 ConnectedMqttClient//通過該對象可以獲取客戶端ID標識 ClientId 和 MQTT 版本 ProtocolVersion。private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e){Console.WriteLine($"客戶端[{e.Client.ClientId}]已連接,協議版本:{e.Client.ProtocolVersion}");}//ClientConnected 和 ClientDisconnected 事件的事件參數一個客戶端連接對象 ConnectedMqttClient//通過該對象可以獲取客戶端ID標識 ClientId 和 MQTT 版本 ProtocolVersion。private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e){Console.WriteLine($"客戶端[{e.Client.ClientId}]已斷開連接!");}//ApplicationMessageReceived 的事件參數包含了客戶端ID標識 ClientId 和 MQTT 應用消息 MqttApplicationMessage 對象,//通過該對象可以獲取主題 Topic、QoS QualityOfServiceLevel 和消息內容 Payload 等信息private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){Console.WriteLine($"客戶端[{e.ClientId}]>> 主題:{e.ApplicationMessage.Topic} 負荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");}//事件參數 MqttNetTraceMessagePublishedEventArgs 包含了線程ID ThreadId、來源 Source、日志級別 Level、日志消息 Message、異常信息 Exception 等。//MqttNetTrace 類還提供了4個不同消息等級的靜態方法,Verbose、Information、Warning 和 Error,//用于給出不同級別的日志消息,該消息將會在 TraceMessagePublished 事件中輸出,//你可以使用 e.Level 進行過慮。private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e){Console.WriteLine($">> 線程ID:{e.ThreadId} 來源:{e.Source} 跟蹤級別:{e.Level} 消息: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }}} }然后啟動服務端
然后修改客戶端的窗體頁面,添加如下控件布局
連接按鈕的點擊事件代碼為
??????? //連接到Mqtt服務器private void button_Connect_Click(object sender, EventArgs e){if (string.IsNullOrEmpty(textServerAddress.Text)){MessageBox.Show("服務器地址不能為空");}else {Task.Run(async () => {await ConnectMqttServerAsync();});}}訂閱并保存到文件按鈕的點擊事件為
?????? //訂閱主題按鈕點擊事件private void btnSubscribe_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("訂閱主題不能為空!");return;}if (!mqttClient.IsConnected){MessageBox.Show("MQTT客戶端尚未連接!");return;}//客戶端連接到服務端之后,可以使用 SubscribeAsync 異步方法訂閱消息,//該方法可以傳入一個可枚舉或可變參數的主題過濾器 TopicFilter 參數,主題過濾器包含主題名和 QoS 等級。mqttClient.SubscribeAsync(new List<TopicFilter> {new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)});txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);}取消訂閱按鈕的點擊事件為
??????? //取消訂閱按鈕點擊事件private void btn_cancle_sub_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();mqttClient.UnsubscribeAsync(new List<String> {topic});}發布按鈕的點擊事件為
??????? //發布主題private void button2_Click_1(object sender, EventArgs e){string topic = txtPubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("發布主題不能為空!"); return;}string inputString = txtSendMessage.Text.Trim();//發布消息前需要先構建一個消息對象 MqttApplicationMessage,//最直接的方法是使用其實構造函數,傳入主題、內容、Qos 等參數。var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);//得到 MqttApplicationMessage 消息對象后,//通過客戶端對象調用其 PublishAsync 異步方法進行消息發布。mqttClient.PublishAsync(appMsg);}完整客戶端示例代碼為
using MQTTnet; using MQTTnet.Core; using MQTTnet.Core.Client; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;namespace MqttnetClient {public partial class Form1 : Form{//MQTTk客戶端private MqttClient mqttClient = null;public Form1(){InitializeComponent();}private async Task ConnectMqttServerAsync(){if (mqttClient == null){//使用 MQTTnet 創建 MQTT 也非常簡單//只需要使用 MqttClientFactory 對象的 CreateMqttClient 方法即可mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;//客戶端支持 Connected、Disconnected 和 ApplicationMessageReceived 事件,//用來處理客戶端與服務端連接、客戶端從服務端斷開以及客戶端收到消息的事情。mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;mqttClient.Connected += MqttClient_Connected;mqttClient.Disconnected += MqttClient_Disconnected;}try{//調用ConnectAsync方法時需要傳遞一個 MqttClientTcpOptions 對象//選項包含了客戶端ID標識 ClientId、服務端地址(可以使用IP地址或域名)Server、端口號 Port、//用戶名 UserName、密碼 Password 等信息。var options = new MqttClientTcpOptions { };if (!string.IsNullOrEmpty(textPort.Text)){options = new MqttClientTcpOptions{Server = textServerAddress.Text,ClientId = Guid.NewGuid().ToString().Substring(0, 5),UserName = textusername.Text,Password = textpassword.Text,Port = int.Parse(textPort.Text),CleanSession = true};}else {options = new MqttClientTcpOptions{Server = textServerAddress.Text,ClientId = Guid.NewGuid().ToString().Substring(0, 5),UserName = textusername.Text,Password = textpassword.Text,CleanSession = true};}//創建客戶端對象后,調用其異步方法 ConnectAsync 來連接到服務端。await mqttClient.ConnectAsync(options);}catch (Exception ex){Invoke((new Action(() =>{txtReceiveMessage.AppendText($"連接到MQTT服務器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);})));}}private void MqttClient_Connected(object sender, EventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText("已連接到MQTT服務器!" + Environment.NewLine);})));}private void MqttClient_Disconnected(object sender, EventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText("已斷開MQTT連接!" + Environment.NewLine);})));}private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){Invoke((new Action(() =>{txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");//將收到的消息追加到相對路徑record.txt文件if (!File.Exists("record.txt")){File.Create("record.txt");}else{File.AppendAllText("record.txt", "\r\n");File.AppendAllText("record.txt", Encoding.Default.GetString(e.ApplicationMessage.Payload));}})));}//訂閱主題按鈕點擊事件private void btnSubscribe_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("訂閱主題不能為空!");return;}if (!mqttClient.IsConnected){MessageBox.Show("MQTT客戶端尚未連接!");return;}//客戶端連接到服務端之后,可以使用 SubscribeAsync 異步方法訂閱消息,//該方法可以傳入一個可枚舉或可變參數的主題過濾器 TopicFilter 參數,主題過濾器包含主題名和 QoS 等級。mqttClient.SubscribeAsync(new List<TopicFilter> {new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)});txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);}//發布主題private void button2_Click_1(object sender, EventArgs e){string topic = txtPubTopic.Text.Trim();if (string.IsNullOrEmpty(topic)){MessageBox.Show("發布主題不能為空!"); return;}string inputString = txtSendMessage.Text.Trim();//發布消息前需要先構建一個消息對象 MqttApplicationMessage,//最直接的方法是使用其實構造函數,傳入主題、內容、Qos 等參數。var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);//得到 MqttApplicationMessage 消息對象后,//通過客戶端對象調用其 PublishAsync 異步方法進行消息發布。mqttClient.PublishAsync(appMsg);}//連接到Mqtt服務器private void button_Connect_Click(object sender, EventArgs e){if (string.IsNullOrEmpty(textServerAddress.Text)){MessageBox.Show("服務器地址不能為空");}else {Task.Run(async () => {await ConnectMqttServerAsync();});}}//取消訂閱按鈕點擊事件private void btn_cancle_sub_Click(object sender, EventArgs e){string topic = txtSubTopic.Text.Trim();mqttClient.UnsubscribeAsync(new List<String> {topic});}} }運行客戶端
這里服務器地址就是本機地址,端口或者用戶名和密碼根據自己需要
然后就可以進行主題的訂閱和發布了
然后在客戶端bin目錄下找到record.txt,可以看到追加到文件成功。
示例代碼下載
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/19431291
參考文章:
https://www.cnblogs.com/kuige/articles/7724786.html
總結
以上是生活随笔為你收集整理的Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VS中使用NuGet安装依赖时提示:无法
- 下一篇: Node-RED订阅MQTT主题并调试数