日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

用emqx做mqtt客户端

發(fā)布時間:2024/3/12 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用emqx做mqtt客户端 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

最近項目中有一個需求,要用mqtt協(xié)議接收路側(cè)設(shè)備的數(shù)據(jù)到云平臺上,所以,研究了一下mqtt客戶端的制作方法。

mqtt協(xié)議是一個發(fā)布訂閱模式的協(xié)議。

這篇文章主要記錄下我搭建mqttbroker和寫mqtt客戶端的過程,是記錄,不是教程,無意教程。

一、下載安裝emqx

emqx是一個mqtt的broker軟件,這個軟件是比較好用的一個broker軟件,以前用過mosquitto軟件做mqtt的broker,但是mosquitto沒有emqx易用,所以就放棄了。

從emqx的官網(wǎng)上下載得到emqx-4.4.1-otp24.1.5-3-el7-amd64.zip,unzip解壓出來一個emqx的文件夾。

進(jìn)入到emqx/bin下執(zhí)行 emqx start,就將emqx啟動起來了。

[root@localhost bin]# ./emqx start WARNING: There seem to be missing dynamic libs from the OS. Using libs from /root/emqx/dynlibs EMQ X Broker 4.4.1 is started successfully! [root@localhost bin]#

查看emqx的運行狀態(tài)用bin下的emqx_ctl命令。

[root@localhost bin]# ./emqx_ctl status Node 'emqx@127.0.0.1' 4.4.1 is started [root@localhost bin]#

經(jīng)過以上的下載、解壓、啟動、查看狀態(tài)四個步驟,就將emqx軟件正常運行起來了。

二、mqtt客戶端的編寫

編寫mqtt客戶端,是需要eclips的paho庫的,這個庫是一個mqtt客戶端的編程接口庫。

在基于maven的java項目中,要增加一個dependency。

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>

接下來就是寫client的代碼了。

下面的代碼中有發(fā)送消息的代碼,也有接收消息的代碼,接收消息的代碼就是那個OnMessageCallback,有了這個Callback類就可以接收mqtt消息了。

import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "哈哈哈,我又來了。";int qos = 2;String broker = "tcp://123.56.182.37:1883"; //這里是broker的地址String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設(shè)置回調(diào),這個回調(diào)類是用來接收mqtt消息的,如果只是想要發(fā)布mqtt消息,那么就不需要這個回調(diào)了。// 當(dāng)然,如果只是想接收mqtt消息,不想發(fā)送,那么就只設(shè)置這個回調(diào),下面的發(fā)送代碼也就不需要了client.setCallback(new OnMessageCallback());// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發(fā)布所需參數(shù)long i = 0;while (i < 10e10) {MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);client.publish(pubTopic, message);Thread.sleep(10000);i++;System.out.println("Message published, i = " + i);}client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} } import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進(jìn)行重連System.out.println("連接斷開,可以做重連");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執(zhí)行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內(nèi)容:" + new String(message.getPayload(), "utf-8"));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());} }

以上兩步就可以完成一個具備基本功能的mqtt協(xié)議客戶端了,這個客戶端既可以發(fā)布消息,也可以接收消息。當(dāng)然,如果只想要一個方面的功能,也可以適當(dāng)裁剪以適應(yīng)自己的需要。

參考資料:
1、https://www.emqx.io/docs/zh/v4.4/getting-started/install.html

總結(jié)

以上是生活随笔為你收集整理的用emqx做mqtt客户端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。