日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > c/c++ >内容正文

c/c++

MQTT从入门到放弃

發(fā)布時(shí)間:2023/12/10 c/c++ 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MQTT从入门到放弃 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

MQTT,是一種基于發(fā)布/訂閱模式的"輕量級(jí)"通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,屬于應(yīng)用層協(xié)議,。

基于TCP協(xié)議、發(fā)布/訂閱協(xié)議,屬于應(yīng)用層協(xié)議。使用C/S架構(gòu),本質(zhì)是一個(gè)消息轉(zhuǎn)發(fā)協(xié)議。所有的客戶端往服務(wù)器發(fā)送消息,然后服務(wù)端根據(jù)過濾規(guī)則,把消息再轉(zhuǎn)發(fā)給符合條件的客戶端。消息的傳輸是有序的、可靠的、雙向的。

一、概述

1.1 參考文檔

  • 官方文檔 (推薦) http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  • 官方文檔(中文) http://mqtt.p2hp.com/mqtt311

1.2 MQTT優(yōu)點(diǎn)

MQTT最大優(yōu)點(diǎn)在于,可以以 極少的代碼和有限的帶寬 ,為遠(yuǎn)程連接設(shè)備提過實(shí)時(shí)可靠的消息服務(wù),作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用

  • 保持長(zhǎng)連接,具有一定實(shí)時(shí)性
  • 適應(yīng)高延時(shí),偶爾斷網(wǎng)
  • 支持高并發(fā)
  • 單次數(shù)據(jù)量小
  • 傳輸可靠
  • 提供不同QoS(服務(wù)優(yōu)先級(jí))
  • 設(shè)置遺囑消息

1.3 MQTT應(yīng)用領(lǐng)域

MQTT是基于二進(jìn)制消息的發(fā)布/訂閱編程模式的消息協(xié)議,非常適合

需要 低功耗網(wǎng)絡(luò)帶寬有限 的IoT場(chǎng)景

比如: 遙感數(shù)據(jù)、汽車、 智能家居、智慧城市、醫(yī)療醫(yī)護(hù)、智慧農(nóng)業(yè) …

二、MQTT協(xié)議原理


實(shí)現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。

注意:消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時(shí)是訂閱者。

2.1 MQTT客戶端

一個(gè)使用MQTT協(xié)議的應(yīng)用程序或者設(shè)備,它總是建立到服務(wù)器的網(wǎng)絡(luò)連接??蛻舳丝梢?#xff1a;

  • 發(fā)布其他客戶端可能會(huì)訂閱的信息
  • 訂閱其它客戶端發(fā)布的消息
  • 退訂或刪除應(yīng)用程序的消息
  • 斷開與服務(wù)器連接

2.2 MQTT服務(wù)端

QTT服務(wù)器以稱為“消息代理”(Broker),可以是一個(gè)應(yīng)用程序或一臺(tái)設(shè)備。它是位于消息發(fā)布者和訂閱者之間,它可以:

  • 接受來自客戶的網(wǎng)絡(luò)連接
  • 接受客戶發(fā)布的應(yīng)用信息
  • 處理來自客戶端的訂閱和退訂請(qǐng)求
  • 向訂閱的客戶轉(zhuǎn)發(fā)應(yīng)用程序消息

2.3 消息結(jié)構(gòu)

每條MQTT命令消息的消息頭都包含一個(gè)固定的報(bào)頭,有些消息會(huì)攜帶一個(gè)可變報(bào)文頭和一個(gè)負(fù)荷。消息格式如下:

固定報(bào)文頭 | 可變報(bào)文頭 | 負(fù)載

2.3.1 固定報(bào)文頭

存在于所有MQTT數(shù)據(jù)包中,表示數(shù)據(jù)包類型及數(shù)據(jù)包的分組類標(biāo)識(shí)。

MQTT固定報(bào)文頭最少有兩個(gè)字節(jié),第一字節(jié)包含消息類型(Message Type)和QoS級(jí)別等標(biāo)志位。第二字節(jié)開始是剩余長(zhǎng)度字段,該長(zhǎng)度是后面的可變報(bào)文頭加消息負(fù)載的總長(zhǎng)度,該字段最多允許四個(gè)字節(jié)。

2.3.2 可變報(bào)文頭

存在于部分MQTT數(shù)據(jù)包中,數(shù)據(jù)包類型決定了可變頭是否存在及其具體內(nèi)容。

可變報(bào)文頭主要包含協(xié)議名、協(xié)議版本、連接標(biāo)志(Connect Flags)、心跳間隔時(shí)間(Keep Alive timer)、連接返回碼(Connect Return Code)、主題名(Topic Name)等。

2.3.3 負(fù)載

Payload直譯為負(fù)荷,消息的內(nèi)容。存在于部分MQTT數(shù)據(jù)包中,表示客戶端收到的具體內(nèi)容。


2.4 MQTT特點(diǎn)

2.4.1 MQTT的消息類型

固定報(bào)文頭中的第一個(gè)字節(jié)包含連接標(biāo)志(Connect Flags),連接標(biāo)志用來區(qū)分MQTT的消息類型。MQTT協(xié)議擁有14種不同的消息類型(見下表),可簡(jiǎn)單分為連接及終止、發(fā)布和訂閱、QoS 2消息的機(jī)制以及各種確認(rèn)ACK。至于每一個(gè)消息類型會(huì)攜帶什么內(nèi)容,這里不多闡述

2.4.2 服務(wù)質(zhì)量(QOS)

2.4.2.1 QOS分類

服務(wù)質(zhì)量水平(QoS)是一個(gè)消息的發(fā)送者和限定遞送保證用于特定消息的消息的接收器之間的協(xié)議。MQTT 中有 3 個(gè) QoS 級(jí)別:

  • QoS0:發(fā)送就不管了,最多一次;
  • QoS1:發(fā)送之后依賴MQTT規(guī)范,是否啟動(dòng)重傳消息,所以至少一次;
  • QoS2:發(fā)送之后依賴MQTT消息機(jī)制,確保只有一次。

QoS0 代表,Sender 發(fā)送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver發(fā)送消息,如果發(fā)送失敗,也就算了;這是完全依賴TCP重傳機(jī)制,如果網(wǎng)絡(luò)不好,TCP的重傳也不是100%可靠,加上MQTT是Publisher 發(fā)出去的消息是依賴代理服務(wù)器完成轉(zhuǎn)發(fā),所以消息最多一次。

QoS1 代表,Sender 發(fā)送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver發(fā)送消息,如果發(fā)送之后沒有收到對(duì)應(yīng)的PUBACK,就會(huì)繼續(xù)重試,直到發(fā)送者Sender 接收到 Receiver 發(fā)送的 PUBACK為止,因?yàn)橹貍鞯脑?#xff0c;Receiver 有可能會(huì)收到重復(fù)的消息;

QoS2 代表,Sender 發(fā)送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,會(huì)繼續(xù)重試,直到 Receiver 收到消息為止,同時(shí)保證 Receiver 不會(huì)因?yàn)橄⒅貍鞫盏街貜?fù)的消息。(個(gè)人理解這一點(diǎn)有點(diǎn)像TCP三次握手的交互過程)

2.4.2.2 QOS特性

  • QoS 是 MQTT 協(xié)議的一個(gè)關(guān)鍵特性。QoS 使客戶端能夠選擇與其網(wǎng)絡(luò)可靠性和應(yīng)用程序邏輯相匹配的服務(wù)級(jí)別。因?yàn)?MQTT 管理消息的重新傳輸并保證交付(即使底層傳輸不可靠),QoS 使不可靠網(wǎng)絡(luò)中的通信變得更加容易。

  • QoS流,在發(fā)送端和接收端是兩件不同的事情。當(dāng)然發(fā)送端與接收端QoS的等級(jí)也可以不一樣。在發(fā)送端與broker之間,發(fā)送端定義了QoS等級(jí)。當(dāng)broker發(fā)送消息到接收端是,接收端決定了QoS的等級(jí)

  • 發(fā)送(發(fā)布)消息的客戶端和接收消息的客戶端之間的 QoS 定義和級(jí)別是兩件不同的事情。這兩種交互的 QoS 級(jí)別也可以不同。向代理發(fā)送 PUBLISH 消息的客戶端定義消息的 QoS。但是,當(dāng)代理將消息傳遞給接收者(訂閱者)時(shí),代理使用接收者(訂閱者)在訂閱期間定義的 QoS。例如,客戶端 A 是消息的發(fā)送者??蛻舳?B 是消息的接收者。如果客戶端 B 以 QoS 1 訂閱代理并且客戶端 A 以 QoS 2 向代理發(fā)送消息,則代理以 QoS 1 將消息傳遞給客戶端 B(接收者/訂閱者)。

2.4.2.3 QOS應(yīng)用場(chǎng)景

QoS 0

  • 發(fā)送方和接收方之間建立了完全或大部分穩(wěn)定的連接。
  • 不介意偶爾丟失幾條消息。如果數(shù)據(jù)不是那么重要或數(shù)據(jù)間隔很短,則某些消息的丟失是可以接受的
  • 不需要消息隊(duì)列。僅當(dāng)斷開連接的客戶端具有 QoS 1 或 2 和持久會(huì)話時(shí),消息才會(huì)排隊(duì)

QoS 1

  • 您需要獲取每條消息,并且您的用例可以處理重復(fù)項(xiàng)。QoS 級(jí)別 1 是最常用的服務(wù)級(jí)別,因?yàn)樗WC消息至少到達(dá)一次,但允許多次傳遞。當(dāng)然,您的應(yīng)用程序必須容忍重復(fù)并能夠相應(yīng)地處理它們。
  • 無法承受 QoS 2 的開銷。QoS 1 傳遞消息的速度比 QoS 2 快得多。

QoS 2

  • 支付場(chǎng)景。一次接收所有消息對(duì)您的應(yīng)用程序至關(guān)重要。如果重復(fù)交付可能損害應(yīng)用程序用戶或訂閱客戶端,則通常會(huì)出現(xiàn)這種情況。請(qǐng)注意開銷以及 QoS 2 交互需要更多時(shí)間才能完成。

關(guān)于QOS的優(yōu)秀連接:
https://blog.csdn.net/m0_50668851/article/details/112555171
https://blog.csdn.net/qq1623803207/article/details/89518318

2.4.3 遺愿標(biāo)志(Will Flag)

在可變報(bào)文頭的連接標(biāo)志位字段(Connect Flags)里有三個(gè)Will標(biāo)志位:Will Flag、Will QoS和Will Retain Flag,這些Will字段用于監(jiān)控客戶端與服務(wù)器之間的連接狀況。如果設(shè)置了Will Flag,就必須設(shè)置Will QoS和Will Retain標(biāo)志位,消息主體中也必須有Will Topic和Will Message字段。

那遺愿消息是怎么回事呢?
服務(wù)器與客戶端通信時(shí),當(dāng)遇到異常或客戶端心跳超時(shí)的情況,MQTT服務(wù)器會(huì)替客戶端發(fā)布一個(gè)Will消息。當(dāng)然如果服務(wù)器收到來自客戶端的DISCONNECT消息,則不會(huì)觸發(fā)Will消息的發(fā)送。

因此,Will字段可以應(yīng)用于設(shè)備掉線后需要通知用戶的場(chǎng)景。

2.4.4 連接?;钚奶鴻C(jī)制(Keep Alive Timer)

MQTT客戶端可以設(shè)置一個(gè)心跳間隔時(shí)間(Keep Alive Timer),表示在每個(gè)心跳間隔時(shí)間內(nèi)發(fā)送一條消息。如果在這個(gè)時(shí)間周期內(nèi),沒有業(yè)務(wù)數(shù)據(jù)相關(guān)的消息,客戶端會(huì)發(fā)一個(gè)PINGREQ消息,相應(yīng)的,服務(wù)器會(huì)返回一個(gè)PINGRESP消息進(jìn)行確認(rèn)。如果服務(wù)器在一個(gè)半(1.5)心跳間隔時(shí)間周期內(nèi)沒有收到來自客戶端的消息,就會(huì)斷開與客戶端的連接。心跳間隔時(shí)間最大值大約可以設(shè)置為18個(gè)小時(shí),0值意味著客戶端不斷開。

2.4.5 MQTT vs MQ

MQTT:一種通信協(xié)議,類似人類交談中的漢語、英語、俄語中的一種語言規(guī)范

MQ:一種通信通道,也叫消息隊(duì)列,類似人類交談中的用電話、email、微信的一種通信方式

市面上的MQ產(chǎn)品很多,如阿里自研并開源RocketMQ,還有類似RabbitMQ、ActiveMQ,他們不僅支持MQTT協(xié)議,還支持如AMQP、stomp協(xié)議等等,EMQ 使用的協(xié)議是mqtt。

MQ支持協(xié)議
ActiveMQActiveMQ是Apache軟件基金會(huì)的開源產(chǎn)品,支持AMQP協(xié)議、MQTT協(xié)議(和XMPP協(xié)議作用類似)、Openwire協(xié)議和Stomp協(xié)議等多種消息協(xié)議。并且ActiveMQ完整支持JMS API接口規(guī)范。
RabbitMQRabbitMQ基于Erlang語言開發(fā)和運(yùn)行。它與Apache ActiveMQ有很多相同的特性,例如RabbitMQ完整支持多種消息協(xié)議:AMQP、STOMP、MQTT、HTTP,我們使用RabbitMQ時(shí)會(huì)默認(rèn)使用AMQP1.0 協(xié)議。當(dāng)然,RabbitMQ作為Apache ActiveMQ最主要的競(jìng)品之一也有其獨(dú)特的功能特性。例如RabbitMQ支持一套特有的Routing-Exchange消息路由規(guī)則。這套規(guī)則可以按照消息內(nèi)容,自動(dòng)將消息歸類到不同的消息隊(duì)列中。

2.4.6 協(xié)議對(duì)比

下圖是各個(gè)協(xié)議間的對(duì)比:


MQTT協(xié)議(低帶寬)

MQTT (Message Queuing Telemetry Transport ),消息隊(duì)列遙測(cè)傳輸,由IBM開發(fā)的即時(shí)通訊協(xié)議,相比來說比較適合物聯(lián)網(wǎng)場(chǎng)景的通訊協(xié)議。MQTT協(xié)議采用發(fā)布/訂閱模式,所有的物聯(lián)網(wǎng)終端都通過TCP連接到云端,云端通過主題的方式管理各個(gè)設(shè)備關(guān)注的通訊內(nèi)容,負(fù)責(zé)將設(shè)備與設(shè)備之間消息的轉(zhuǎn)發(fā)。

適用范圍:在低帶寬、不可靠的網(wǎng)絡(luò)下提供基于云平臺(tái)的遠(yuǎn)程設(shè)備的數(shù)據(jù)傳輸和監(jiān)控。

MQTT協(xié)議一般適用于設(shè)備數(shù)據(jù)采集到端(Device-》Server,Device-》Gateway),集中星型網(wǎng)絡(luò)架構(gòu)(hub-and-spoke),不適用設(shè)備與設(shè)備之間通信,設(shè)備控制能力弱,另外實(shí)時(shí)性較差,一般都在秒級(jí)。

AMQP協(xié)議(互操作性)
AMQP(Advanced Message Queuing Protocol),先進(jìn)消息隊(duì)列協(xié)議,這是OASIS組織提出的,該組織曾提出OSLC(Open Source Lifecyle)標(biāo)準(zhǔn),用于業(yè)務(wù)系統(tǒng)例如PLM,ERP,MES等進(jìn)行數(shù)據(jù)交換。

適用范圍:最早應(yīng)用于金融系統(tǒng)之間的交易消息傳遞,在物聯(lián)網(wǎng)應(yīng)用中,主要適用于移動(dòng)手持設(shè)備與后臺(tái)數(shù)據(jù)中心的通信和分析。

XMPP協(xié)議(即時(shí)通信)

XMPP(Extensible Messaging and Presence Protocol)可擴(kuò)展通訊和表示協(xié)議,XMPP的前身是Jabber,一個(gè)開源形式組織產(chǎn)生的網(wǎng)絡(luò)即時(shí)通信協(xié)議。XMPP目前被IETF國(guó)際標(biāo)準(zhǔn)組織完成了標(biāo)準(zhǔn)化工作。

適用范圍:即時(shí)通信的應(yīng)用程序,還能用在網(wǎng)絡(luò)管理、內(nèi)容供稿、協(xié)同工具、檔案共享、游戲、遠(yuǎn)端系統(tǒng)監(jiān)控等。

JMS (Java Message Service)

Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持
JMS是協(xié)議同時(shí)也是 Java 消息服務(wù)規(guī)范的標(biāo)準(zhǔn)實(shí)現(xiàn),同時(shí)也是 Java 企業(yè)版(JEE)規(guī)范的一部分。

優(yōu)秀連接 https://blog.csdn.net/gyshun/article/details/83036987

2.5 消息持久化

需要滿足以下三個(gè)條件:
1、cleanSession = false
2、clientId不為空
3、客戶端subscribe時(shí)的Qos=1,發(fā)布端publish時(shí)的Qos=1

// 接受離線消息 告訴代理客戶端是否要建立持久會(huì)話 false為建立持久會(huì)話mqttConnectOptions.setCleanSession(Boolean.FALSE);

2.6 ※ 實(shí)現(xiàn)方式!!!

參考鏈接:https://blog.51cto.com/u_15067242/2574302

MQTT客戶端采用的是Spring Intergration和Eclipse.paho的方式實(shí)現(xiàn)的。當(dāng)然,你也可以直接使用Eclipse.paho作為客戶端連接。

2.6.1 Spring Intergration

官方的說法我就不過多的解釋了,比較晦澀,這里我談一下自己的理解。其實(shí)Spring Intergration就類似一個(gè)水電系統(tǒng)??傞l、各樓層的控制、分流、聚合、過濾、沉淀、消毒、排污,這里的每一個(gè)環(huán)節(jié)都類似一個(gè)系統(tǒng)服務(wù),可能是MQTT,可能是Redis,可能是MongoDB,可能是Job,可能是我們系統(tǒng)服務(wù)的任何一個(gè)模塊。那么Spring Intergration扮演的角色就是將這些功能能夠連接起來組成一個(gè)完整的服務(wù)系統(tǒng),實(shí)現(xiàn)企業(yè)系統(tǒng)的集成的解決方案。就像管道一樣各個(gè)模塊連接到起,管道能夠連接到千家萬戶需要很多水表、分頭管、水龍頭,管道開關(guān)等等這些都是Spring Intergration的主要組件。

<!-- mqtt依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

spring-integration-mqtt內(nèi)部依賴了Eclipse.paho的包,所以不需要在單獨(dú)引入

關(guān)于Spring Intergration的版本問題,其官方文檔:官方文檔連接,值得深看!

這里是對(duì)官方文檔的部分謄寫&理解:

???????從4.1版本開始,編程方式改變適配器訂閱的主題可以省略u(píng)rl,DefaultMqttPahoClientFactory屬性serverURIs可以提供服務(wù)端URI,例如,這將使能連接至HA高可用簇。

???????從4.2.2版本開始,當(dāng)適配器成功訂閱至主題后,發(fā)布MqttSubscribedEvent,當(dāng)連接/訂閱失敗時(shí),發(fā)布MqttConnectionFailedEvent。這些事件可以由實(shí)現(xiàn)ApplicationListener接口的實(shí)體類獲取。
???????新的屬性recoveryInterval控制在故障之后適配器會(huì)嘗試重新連接的時(shí)間間隔,默認(rèn)為10000ms(10s)

???????在4.2.3版本之前,當(dāng)適配器停止后,客戶端總是會(huì)解除訂閱,這是不正確的。 ,因?yàn)槿绻蛻舳薗oS大于0,我們需要保持訂閱以便適配器停止時(shí)到達(dá)的消息在下一次開始時(shí)會(huì)傳送。這也需要設(shè)置客戶端工廠cleanSession屬性為false,默認(rèn)值為true。

???????從4.2.3版本開始,如果cleanSession值為false,適配器不會(huì)解除訂閱(默認(rèn))??梢灾貙懺撔袨?#xff0c;通過設(shè)置工廠屬性consumerCloseAction,可以有以下值:UNSUBSCRIBE_ALWAYS,UNSUBSCRIBE_NEVER以及UNSUBSCRIBE_CLEAN,后者(默認(rèn))會(huì)解除訂閱僅當(dāng)cleanSession屬性值為true?;赝酥?.2.3之前的行為,使用UNSUBSCRIBE_ALWAYS。

2.6.2 Eclipse.paho

Eclipse.paho是基于IMqttClient和IMqttAnsycClient接口實(shí)現(xiàn)的MQTT客戶端中間件。其內(nèi)部實(shí)現(xiàn)了完整的消息發(fā)布與訂閱、socket長(zhǎng)連接、心跳機(jī)制、斷線重連以及消息本地緩存等一系列功能。是目前比較主流的MQTT客戶端中間件。

2.7 重連

MQTT有個(gè)自動(dòng)重連功能。有兩種方式可以實(shí)現(xiàn)自動(dòng)重連。

2.7.1 使用Spring Intergration方式

基于 MQTT連接配置類MqttConnectOption類可以設(shè)置自動(dòng)重連。

// 斷開后重連,但這個(gè)方法并沒有重新訂閱的機(jī)制 // 在嘗試重新連接之前,它將首先等待1秒,對(duì)于每次失敗的重新連接嘗試,延遲將加倍,直到達(dá)到2分鐘,此時(shí)延遲將保持在2分鐘。 options.setAutomaticReconnect(true);// 接受離線消息 告訴代理客戶端是否要建立持久會(huì)話 false為建立持久會(huì)話 mqttConnectOptions.setCleanSession(Boolean.FALSE);

若使用了Spring Intergration方式實(shí)現(xiàn)mqtt客戶端,那么只用將setAutomaticReconnect設(shè)置為true,setCleanSession設(shè)置為false即可。

原因:

  • 使用 automaticReconnect 為 true 表示斷線自動(dòng)重連,但僅僅只是重新連接,并不訂閱主題;
  • 前文說到,從4.2.3版本開始,如果cleanSession值為false,適配器不會(huì)解除訂閱(默認(rèn))。
    因此,只要保證這兩點(diǎn),mqtt即可斷線重連。
  • 2.7.2 使用Eclipse.paho(mqttv3)方式

  • 同樣的,使用MQTT自帶的 AutomaticReconnect 屬性
  • // 斷開后重連,但這個(gè)方法并沒有重新訂閱的機(jī)制 options.setAutomaticReconnect(true);
  • 方法一:在 connectComplete 回調(diào)函數(shù)重新訂閱,實(shí)現(xiàn)如下:
  • @Override public void connectComplete(boolean b, String s) {// 客戶端連接成功log.info("[MQTT] 連接成功,重新訂閱主題...");try {client.subscribe(topic, QOS);} catch (MqttException e) {e.printStackTrace();} }

    ------------------------------------------------------或者------------------------------------------------------
    方法二:在connectionLost () 回調(diào)函數(shù)中自定義重新連接、重新訂閱

    @Override public void connectionLost(Throwable cause) {// 連接斷開CodeUtils.info("[MQTT] 連接斷開,30S之后嘗試重連...");while(true) {try {Thread.sleep(30000);// 重新連接client.connect(options);// 重新訂閱client.subscribe(topic, QOS);break;} catch (Exception e) {e.printStackTrace();continue;}} }

    2.8 連接、斷開通知(踩坑處)

    坑一博主博人嘗試了使用Spring Intergration方式,再加上實(shí)現(xiàn)接口MqttCallbackExtended /MqttCallback,發(fā)現(xiàn)并沒有在連接斷開、重連、收到消息的時(shí)候進(jìn)入該方法。再看MqttCallbackExtended /MqttCallback這兩個(gè)接口都在package org.eclipse.paho.client.mqttv3包下因此推斷該方式僅限用于mqttv3方式下使用!!!
    坑二在運(yùn)用過程中,會(huì)出現(xiàn)斷開連接第一次重連成功后,一直斷開連接再重連再斷開連接再重連的死循環(huán)中

    • 問題原因:
      創(chuàng)建了相同clientid 的MqttClient。
    • 問題解析:
      因?yàn)閏lientid是MqttClient的唯一標(biāo)識(shí),當(dāng)重新new上一個(gè)clientid相同的MqttClient就會(huì)出現(xiàn)重連時(shí)創(chuàng)建的MqttClient使程序中初始化時(shí)創(chuàng)建的MqttClient斷開連接,斷開連接后就會(huì)回滾到connectionLost方法中,然后此方法中又會(huì)繼續(xù)重連。
    • 解決辦法:
      1.不需要重新new一個(gè)MqttClient,只需要調(diào)用connect()方法就OK了。
      2.在創(chuàng)建clientId的時(shí)候,最后添加上隨機(jī)數(shù),那樣每次都是不同的clientId

    2.8.1 MqttCallback

    官方文檔鏈接 在此,惡靈退散~~~~

    MqttCallback:使應(yīng)用程序能夠在與客戶端相關(guān)的異步事件發(fā)生時(shí)得到通知。實(shí)現(xiàn)此接口的類可以在兩種類型的客戶端上注冊(cè):IMqttClient.setCallback(MqttCallback) 和IMqttAsyncClient.setCallback(MqttCallback)

    public interface MqttCallback {//當(dāng)與服務(wù)器的連接丟失時(shí)調(diào)用此方法。public void connectionLost(Throwable cause);//當(dāng)消息從服務(wù)器到達(dá)時(shí)調(diào)用此方法。public void messageArrived(String topic, MqttMessage message) throws Exception;//當(dāng)消息的傳遞完成并收到所有確認(rèn)時(shí)調(diào)用。public void deliveryComplete(IMqttDeliveryToken token); }

    2.8.2 MqttCallbackExtended

    官方文檔鏈接 在此,萬國(guó)臣服~~~~

    MqttCallbackExtended是paho.mqtt.java客戶端需要監(jiān)控連接狀態(tài)變更事件,以進(jìn)行異常維測(cè)和處理所提供的接口。
    MqttCallbackExtended接口繼承了MqttCallback接口,并在其基礎(chǔ)上,新增方法:

    // 當(dāng)與服務(wù)器的連接成功完成時(shí)調(diào)用該方法。 void connectComplete(boolean reconnect, java.lang.String serverURI);

    參數(shù):

    • reconnect- 如果為真,則連接是自動(dòng)重新連接的結(jié)果。
    • serverURI- 建立連接的服務(wù)器 URI。

    2.9 動(dòng)態(tài)訂閱主題

    其實(shí),我們?cè)谑褂迷?Spring Intergration&采用工廠模式,初始化訂閱者時(shí),已經(jīng)預(yù)先設(shè)置了主題

    @Beanpublic MessageProducer inbound() {// 可以同時(shí)消費(fèi)(訂閱)多個(gè)Topic// Paho客戶端消息驅(qū)動(dòng)通道適配器,主要用來訂閱主題 對(duì)inboundTopics主題進(jìn)行監(jiān)聽MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId,mqttClientFactory(), consumerTopic);adapter.setCompletionTimeout(timeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);// 設(shè)置訂閱通道adapter.setOutputChannel(mqttInboundChannel());//adapter.setErrorChannel();return adapter;}

    其中consumerTopic在初始化后就可以訂閱,但是不符合 ‘在使用過程中想訂閱新的主題’ 的使用場(chǎng)景,因此這里借助了MqttPahoMessageDrivenChannelAdapter的添加/刪除主題 的方法

    注意:MessageProducer類有沒有提供“添加主題”的方法,在實(shí)例化bean過程中,真正對(duì)象是MqttPahoMessageDrivenChannelAdapter的實(shí)例對(duì)象,因此可以斷定MessageProducer是MqttPahoMessageDrivenChannelAdapter的引用類,即父類。
    因此,為了使用MqttPahoMessageDrivenChannelAdapter的addTopic(String topic, int qos)、 removeTopic(String... topic)的方法有以下兩種方式:

  • 創(chuàng)建MessageProducer 的bean對(duì)象后,在實(shí)例化后,使用時(shí)再?gòu)?qiáng)轉(zhuǎn)成MqttPahoMessageDrivenChannelAdapter。
  • package com.ruoyi.zy.mqtt;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;/** * @author hhh * @since 2022/5/24 10:53 */ @RestController public class MqttController {@Autowiredprivate MessageProducer messageProducer;@PostMapping("/addTopic")public String addTopic(@RequestBody String data){((MqttPahoMessageDrivenChannelAdapter)messageProducer).addTopic("addTopicName",1);((MqttPahoMessageDrivenChannelAdapter)messageProducer).removeTopic("addTopicName");return "test is Ok!";}}
  • 創(chuàng)建bean對(duì)象時(shí),直接創(chuàng)建MqttPahoMessageDrivenChannelAdapter
  • /*** MQTT消息訂閱綁定(消費(fèi)者)*/@Beanpublic MqttPahoMessageDrivenChannelAdapter inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId,mqttClientFactory(), consumerTopic);adapter.setCompletionTimeout(timeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInboundChannel());return adapter;}

    三、MQTT代碼

    3.1 MQTT配置類

    package com.ruoyi.common.config;import com.alibaba.fastjson.JSON; import com.ruoyi.zy.mqtt.MqttConsumer; import com.ruoyi.zy.mqtt.ZyMqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;@Configuration public class MqttConfig {private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);@AutowiredZyMqttCallback zyMqttCallback;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String hostUrl;@Value("${mqtt.producerClientId}")private String producerClientId;@Value("${mqtt.producerTopic}")private String producerTopic;//生產(chǎn)者和消費(fèi)者是單獨(dú)連接服務(wù)器會(huì)使用到一個(gè)clientid(客戶端id),// 如果是同一個(gè)clientid的話會(huì)出現(xiàn)Lost connection: 已斷開連接; retrying...@Value("${mqtt.consumerClientId}")private String consumerClientId;@Value("${mqtt.consumerTopic}")private String[] consumerTopic;@Value("${mqtt.timeout}")private int timeout;@Value("${mqtt.keepalive}")private int keepalive;//入站通道名(消費(fèi)者)訂閱的bean名稱public static final String CHANNEL_NAME_IN = "mqttInboundChannel";//出站通道名(生產(chǎn)者)發(fā)布的bean名稱public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";/*** MQTT連接器基本信息選項(xiàng)** @return {@link MqttConnectOptions}*/@Beanpublic MqttConnectOptions getMqttConnectOptions() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());// 設(shè)置連接地址,支持集群模式mqttConnectOptions.setServerURIs(new String[]{hostUrl});mqttConnectOptions.setKeepAliveInterval(keepalive);// 接受離線消息 告訴代理客戶端是否要建立持久會(huì)話 false為建立持久會(huì)話mqttConnectOptions.setCleanSession(Boolean.FALSE);//設(shè)置重連機(jī)制mqttConnectOptions.setAutomaticReconnect(true);// 設(shè)置遺囑消息MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload("bit_plate offline".getBytes());System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!"+JSON.toJSONString(mqttMessage).getBytes());mqttConnectOptions.setWill("topic_offline", JSON.toJSONString(mqttMessage).getBytes(), 1, true);return mqttConnectOptions;}/*** MQTT客戶端. 創(chuàng)建MqttPahoClientFactory,設(shè)置MQTT Broker連接屬性,如果使用SSL驗(yàn)證,也在這里設(shè)置。** @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*******************************生產(chǎn)者*******************************************//*** MQTT信息通道(生產(chǎn)者)*/@Bean(name = CHANNEL_NAME_OUT)public MessageChannel mqttOutboundChannel() {//使用點(diǎn)對(duì)點(diǎn)模型,消息管道類型DirectChannelreturn new DirectChannel();}/*** MQTT消息處理器(生產(chǎn)者)* <p>* ServiceActivator注解表明:當(dāng)前方法用于處理MQTT消息,outputChannel參數(shù)指定了用于生產(chǎn)消息的channel。*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId, mqttClientFactory());// 如果設(shè)置成true,即異步,發(fā)送消息時(shí)將不會(huì)阻塞。messageHandler.setAsync(true);messageHandler.setDefaultTopic(producerTopic);return messageHandler;}/*******************************消費(fèi)者*******************************************//*** MQTT信息通道(消費(fèi)者)*/@Bean(name = CHANNEL_NAME_IN)public MessageChannel mqttInboundChannel() {return new DirectChannel();}/*** MQTT消息訂閱綁定(消費(fèi)者)*/@Beanpublic MessageProducer inbound() {//管道適配器。因?yàn)橥獠繀f(xié)議有無數(shù)種,消息適配器則用于連接不同協(xié)議的外部系統(tǒng)。從外部系統(tǒng)讀入數(shù)據(jù)并對(duì)數(shù)據(jù)進(jìn)行處理最終// 與Spring Integration 內(nèi)部的消息系統(tǒng)適配。例如將要進(jìn)行Mqtt集成,那么就需要一個(gè)Mqtt的管道適配器,// 可以同時(shí)消費(fèi)(訂閱)多個(gè)Topic// Paho客戶端消息驅(qū)動(dòng)通道適配器,主要用來訂閱主題 對(duì)inboundTopics主題進(jìn)行監(jiān)聽MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId,mqttClientFactory(), consumerTopic);adapter.setCompletionTimeout(timeout);//編解碼器。該方法用與對(duì)消息負(fù)載進(jìn)行編解碼。-----可自定義,但是需要實(shí)現(xiàn)MqttMessageConverter接口!!!adapter.setConverter(new DefaultPahoMessageConverter());// 設(shè)置消息的服務(wù)質(zhì)量// 0:最多一次傳送 (只負(fù)責(zé)傳送,發(fā)送過后就不管數(shù)據(jù)的傳送情況)// 1:至少一次傳送 (確認(rèn)數(shù)據(jù)交付)// 2:正好一次傳送 (保證數(shù)據(jù)交付成功)adapter.setQos(1);// 設(shè)置訂閱通道adapter.setOutputChannel(mqttInboundChannel());//adapter.setErrorChannel();return adapter;}/*** MQTT消息處理器(消費(fèi)者)*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() {return new MqttConsumer();}//如果我要配置多個(gè)client,只要配置多個(gè)通道即可//通道2 // @Bean // public MessageChannel mqttInputChannelTwo() { // return new DirectChannel(); // } // //配置client2,監(jiān)聽的topic:hell2,hello3 // @Bean // public MessageProducer inbound1() { // MqttPahoMessageDrivenChannelAdapter adapter = // new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), // "hello2","hello3"); // adapter.setCompletionTimeout(timeout); // adapter.setConverter(new DefaultPahoMessageConverter()); // adapter.setQos(1); // adapter.setOutputChannel(mqttInputChannelTwo()); // return adapter; // } // // //通過通道2獲取數(shù)據(jù) // @Bean // @ServiceActivator(inputChannel = "mqttInputChannelTwo") // public MessageHandler handlerTwo() { // return new MqttConsumer(); // } }

    3.2 MQTT生產(chǎn)者

    package com.ruoyi.zy.mqtt;import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;/*** @description mqtt生產(chǎn)者* @since 2022/5/24 10:16*/ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttProducer {void sendToMqtt(String data);void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

    3.2 MQTT消費(fèi)者

    package com.ruoyi.zy.mqtt;import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.exception.CustomException; import com.ruoyi.common.utils.HarWarUtils; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.zy.annotion.Zy5gDataField; import com.ruoyi.zy.constants.MqttConstants; import com.ruoyi.zy.constants.MsgIdEnum; import com.ruoyi.zy.dto.body.accept.AcceptBootBody; import com.ruoyi.zy.dto.body.accept.AcceptHeartBody; import com.ruoyi.zy.dto.data.accept.AcceptBootData; import com.ruoyi.zy.service.ZyService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component;import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List;import static cn.hutool.core.util.ReflectUtil.getMethod; import static com.ruoyi.kwt.uav5g.message.Kwt5gData.getFieldsType;/*** @description mqtt消費(fèi)者* @since 2022/5/24 11:13*/ @Component @ConditionalOnProperty(value = "mqtt.enable", havingValue = "true") public class MqttConsumer implements MessageHandler {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate ZyService zyService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {try {String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));//獲取消息內(nèi)容String msgStr = String.valueOf(message.getPayload());logger.info("接收到mqtt消息,主題:{} 消息:{}", topic, msgStr);if (StringUtils.isEmpty(msgStr)) {logger.error("錯(cuò)誤!接收到 mqtt消息,主題:{} 消息為空", topic);return;}JSONObject jsonObj = (JSONObject) JSONObject.parse(msgStr);Integer msg_id = jsonObj.getJSONObject(MqttConstants.HEAD).getInteger(MqttConstants.MSG_ID);if (null == msg_id) {throw new CustomException("mqtt收到消息錯(cuò)誤!");}switch (MsgIdEnum.getByMsgCode(msg_id)) {case BOOT:handleBootBody(jsonObj);break;case HEART:handleHeartBody(jsonObj);break;case LIVE:handleLiveBody(jsonObj);break;case C2:handleC2Body(jsonObj);break;case FLY:handleFlyBody(jsonObj);break;default:throw new CustomException("mqtt收到消息錯(cuò)誤!未識(shí)別對(duì)應(yīng)的 msg_id:{}", msg_id);}} catch (CustomException ce) {logger.error(ce.getMessage());} catch (Exception e) {e.printStackTrace();}}public void handleBootBody(JSONObject jsonObj) {try {AcceptBootBody acceptBootBody = JSON.toJavaObject(jsonObj, AcceptBootBody.class);AcceptBootData data = acceptBootBody.getData();Field[] fields = data.getClass().getDeclaredFields();for (Field field : fields) {Zy5gDataField ann = field.getAnnotation(Zy5gDataField.class);if (ann == null) {continue;}//json字段解析Class fieldType = field.getType();Object originalObj = jsonObj.getJSONObject(MqttConstants.DATA).get(field.getName());Object newObj = bodyParseObjValue(fieldType, originalObj, ann.ratio());field.setAccessible(true);field.set(data, newObj);}//處理了* / 倍率之后的正確數(shù)值zyService.handleBootBody(acceptBootBody);} catch (Exception e) {e.printStackTrace();}}private static Object bodyParseObjValue(Class<?> type, Object originalObj, float ratio) throws Exception {//數(shù)組if (originalObj instanceof List) {List bodyList = Convert.toList(type.newInstance().getClass(), originalObj);List destList = new ArrayList<>();for (int k = 0; k < bodyList.size(); k++) {Object test = type.newInstance();Field[] fields = type.getDeclaredFields();for (Field field : fields) {Zy5gDataField fieldAnnotation = field.getAnnotation(Zy5gDataField.class);if (fieldAnnotation == null) {continue;}Class<?> fieldType = getFieldsType(field);String methodName = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);String getMethodName = "get" + methodName;Method m = getMethod(type, getMethodName);Object obj = m.invoke(bodyList.get(k));Object newObj = harWarJsonParseObjValueBasic(fieldType, obj, fieldAnnotation.ratio());field.set(test, newObj);}destList.add(test);}return destList;}//基類(僅數(shù)字相關(guān))Object newObj = harWarJsonParseObjValueBasic(type, originalObj, ratio);//String等其余非基本類型返回原值return newObj;}/*** 解析json字段*/private static Object harWarJsonParseObjValueBasic(Class<?> type, Object originalObj, float ratio) throws Exception {if (HarWarUtils.isEmpty(originalObj) || ratio == 1) {return originalObj;}if (ratio == 0) {throw new CustomException(type + "參數(shù)的倍率ratio不能為0!");}if (type.equals(int.class) || type.equals(Integer.class)) {originalObj = (Integer) originalObj / ratio;} else if (type.equals(float.class) || type.equals(Float.class)) {originalObj = Float.parseFloat(originalObj.toString()) / ratio;} else if (type.equals(double.class) || type.equals(Double.class)) {originalObj = Double.parseDouble(originalObj.toString()) / ratio;} else if (type.equals(long.class) || type.equals(Long.class)) {originalObj = Long.parseLong(originalObj.toString()) / ratio;} else if (String.class.isAssignableFrom(type)) {originalObj = originalObj.toString();} else {throw new Exception("不支持的數(shù)據(jù)類型,type=" + type);}return originalObj;}public void handleHeartBody(JSONObject jsonObj) {AcceptHeartBody acceptHeartBody = JSON.toJavaObject(jsonObj, AcceptHeartBody.class);zyService.handleHeartBody(acceptHeartBody);}public void handleLiveBody(JSONObject jsonObj) {}public void handleC2Body(JSONObject jsonObj) {}public void handleFlyBody(JSONObject jsonObj) {}}

    總結(jié)

    以上是生活随笔為你收集整理的MQTT从入门到放弃的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。