日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > c/c++ >内容正文

c/c++

MQTT断线重连及订阅消息恢复

發(fā)布時間:2023/12/18 c/c++ 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MQTT断线重连及订阅消息恢复 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

MQTT斷線重連及訂閱消息恢復(fù)

注意注意,MQTT重連后需要重新訂閱主題才能重新接收到消息

我這里使用的是

//設(shè)置斷開后重新連接 options.setAutomaticReconnect(true); @Overridepublic void connectionLost(Throwable throwable) {log.error("連接斷開,下面做重連...");long reconnectTimes = 1;while (true) {try {if (mqttClient.isConnected()) {log.warn("mqtt reconnect success end");break;}if(reconnectTimes == 10){//當(dāng)重連次數(shù)達(dá)到10次時,就拋出異常,不在重連log.warn("mqtt reconnect error");return;}log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);mqttClient.reconnect();} catch (MqttException e) {log.error("", e);}try {Thread.sleep(1000);} catch (InterruptedException e1) { // e1.printStackTrace();}}}

看MQTT的connec的源碼發(fā)現(xiàn)了一段代碼使我找到了解決方案
MqttAsyncClient 的 connect()方法

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException {......//省略......comms.setNetworkModules(createNetworkModules(serverURI, options));comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));// Insert our own callback to iterate through the URIs till the connect// succeedsMqttToken userToken = new MqttToken(getClientId());ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken, userContext, callback, reconnecting);userToken.setActionCallback(connectActionListener);userToken.setUserContext(this);// If we are using the MqttCallbackExtended, set it on the// connectActionListenerif (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);}comms.setNetworkModuleIndex(0);connectActionListener.connect();return userToken;}

MqttReconnectCallback 是實現(xiàn)MqttCallbackExtended接口的
發(fā)現(xiàn)comms中有設(shè)置重連的回調(diào)對象
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
但是怎么把這個回調(diào)由我們來主動放進(jìn)去呢?繼續(xù)往下看源碼可以發(fā)現(xiàn)
MqttReconnectCallback對象只是在連接丟失connectionLost的時候進(jìn)行循環(huán)連接
點擊startReconnectCycle()最終又會回到
MqttAsyncClient 的 connect()方法

class MqttReconnectCallback implements MqttCallbackExtended {final boolean automaticReconnect;MqttReconnectCallback(boolean isAutomaticReconnect) {automaticReconnect = isAutomaticReconnect;}public void connectionLost(Throwable cause) {if (automaticReconnect) {// Automatic reconnect is set so make sure comms is in resting// statecomms.setRestingState(true);reconnecting = true;startReconnectCycle();}}public void messageArrived(String topic, MqttMessage message) throws Exception {}public void deliveryComplete(IMqttDeliveryToken token) {}public void connectComplete(boolean reconnect, String serverURI) {}}

也就是如果我們在之前放入client的回調(diào)對象是實現(xiàn)的 MqttCallbackExtended 接口,則MQTT會將我們的回調(diào)對象放入 connectActionListener 中 然后由 connectActionListener實現(xiàn)具體的connect

接下來我們將 MessageCallback 對象改為實現(xiàn) MqttCallbackExtended這個接口,然后實現(xiàn)下面方法

mqttClient.setCallback(new MqttCallbackExtended () {/*** Called when the connection to the server is completed successfully.** @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {try{//如果監(jiān)測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}log.info("----TAG", "connectComplete: 訂閱主題成功");}catch(Exception e){e.printStackTrace();log.info("----TAG", "error: 訂閱主題失敗");}}

然后可能在同一個環(huán)境,比方測試服和本地,創(chuàng)建同ip端口,用戶密碼clientId一樣的客戶端,那么2邊會占用資源,需要加上異常報錯,我的處理方式是連接10次不行就讓他掉線,還需要在報錯的地方加上處理:

//當(dāng)創(chuàng)建客戶端的時候出現(xiàn) 已斷開連接,有可能是在另一個環(huán)境下啟動了該客戶端,直接吧這邊的客戶端關(guān)閉,不然另一邊會無限重連if(e.getMessage().equals("已斷開連接") || e.getMessage().equals("客戶機(jī)未連接")){try {mqttClient.close();} catch (MqttException ex) {ex.printStackTrace();}}

以下是我的開發(fā)完整代碼,使用了多線程方式創(chuàng)建,

package com.t4cloud.t.sensor.entity;import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;//MQTT客戶端線程 @Slf4j public class MqttClientThread extends Thread{//連接地址private String serverURL;//MQTT客戶端登錄用戶名private String mqttUsername;//MQTT客戶端密碼private String mqttPassWord;//MQTT訂閱主題private String mqttTopic;//MQTT的clientprivate String clientId;//產(chǎn)品idprivate String productId;//推送至我們自己的RedisTopIc中channelprivate String channel = "mqtt";//mqtt實體類private MqttClient mqttClient;//構(gòu)造函數(shù)public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) {this.serverURL = serverURL;this.mqttUsername = mqttUsername;this.mqttPassWord = mqttPassWord;this.mqttTopic = mqttTopic;this.clientId = clientId;this.productId = productId;}//線程方法public void run(){try {// host為主機(jī)名,clientid即連接MQTT的客戶端ID,一般以客戶端唯一標(biāo)識符表示,// MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存,就用usernamemqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence());// 配置參數(shù)信息MqttConnectOptions options = new MqttConnectOptions();// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,// 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接options.setCleanSession(true);// 設(shè)置用戶名options.setUserName(mqttUsername);// 設(shè)置密碼options.setPassword(mqttPassWord.toCharArray());// 設(shè)置超時時間 單位為秒options.setConnectionTimeout(10);// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機(jī)制 // options.setKeepAliveInterval(20);//設(shè)置斷開后重新連接options.setAutomaticReconnect(true);// 連接mqttClient.connect(options);// 訂閱//如果監(jiān)測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}// 設(shè)置回調(diào)mqttClient.setCallback(new MqttCallbackExtended () {/*** Called when the connection to the server is completed successfully.** @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {try{//如果監(jiān)測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}log.info("----TAG", "connectComplete: 訂閱主題成功");}catch(Exception e){e.printStackTrace();log.info("----TAG", "error: 訂閱主題失敗");}}@Overridepublic void connectionLost(Throwable throwable) {log.error("連接斷開,下面做重連...");long reconnectTimes = 1;while (true) {try {if (mqttClient.isConnected()) {log.warn("mqtt reconnect success end");break;}if(reconnectTimes == 10){//當(dāng)重連次數(shù)達(dá)到10次時,就拋出異常,不在重連log.warn("mqtt reconnect error");return;}log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);mqttClient.reconnect();} catch (MqttException e) {log.error("", e);}try {Thread.sleep(1000);} catch (InterruptedException e1) { // e1.printStackTrace();}}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.info("接收消息主題 : " + topic);log.info("接收消息Qos : " + mqttMessage.getQos());log.info("接收消息內(nèi)容 : " + new String(mqttMessage.getPayload()));//向我們通道中發(fā)送消息RedisMsg redisMsg = new RedisMsg();redisMsg.setChannel(channel);redisMsg.setMsg("推送MQTT消息");SensorMqttMsg mqttMsg = new SensorMqttMsg();mqttMsg.setProductId(productId);mqttMsg.setPayload(new String(mqttMessage.getPayload()));redisMsg.setData(mqttMsg);RedisTopicUtil.sendMessage(channel, redisMsg);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//認(rèn)證過程log.info("deliveryComplete.............");}});//放入緩存,根據(jù)clinetId吧mqttClient對象放進(jìn)去MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient);} catch (Exception e) {e.printStackTrace();//當(dāng)創(chuàng)建客戶端的時候出現(xiàn) 已斷開連接,有可能是在另一個環(huán)境下啟動了該客戶端,直接吧這邊的客戶端關(guān)閉,不然另一邊會無限重連if(e.getMessage().equals("已斷開連接") || e.getMessage().equals("客戶機(jī)未連接")){try {mqttClient.close();} catch (MqttException ex) {ex.printStackTrace();}}}} }

參考原文鏈接:https://blog.csdn.net/csdm_admin/article/details/119935243

總結(jié)

以上是生活随笔為你收集整理的MQTT断线重连及订阅消息恢复的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。