用emqx做mqtt客户端
最近項目中有一個需求,要用mqtt協(xié)議接收路側(cè)設(shè)備的數(shù)據(jù)到云平臺上,所以,研究了一下mqtt客戶端的制作方法。
mqtt協(xié)議是一個發(fā)布訂閱模式的協(xié)議。
這篇文章主要記錄下我搭建mqttbroker和寫mqtt客戶端的過程,是記錄,不是教程,無意教程。
一、下載安裝emqx
emqx是一個mqtt的broker軟件,這個軟件是比較好用的一個broker軟件,以前用過mosquitto軟件做mqtt的broker,但是mosquitto沒有emqx易用,所以就放棄了。
從emqx的官網(wǎng)上下載得到emqx-4.4.1-otp24.1.5-3-el7-amd64.zip,unzip解壓出來一個emqx的文件夾。
進(jìn)入到emqx/bin下執(zhí)行 emqx start,就將emqx啟動起來了。
[root@localhost bin]# ./emqx start WARNING: There seem to be missing dynamic libs from the OS. Using libs from /root/emqx/dynlibs EMQ X Broker 4.4.1 is started successfully! [root@localhost bin]#查看emqx的運行狀態(tài)用bin下的emqx_ctl命令。
[root@localhost bin]# ./emqx_ctl status Node 'emqx@127.0.0.1' 4.4.1 is started [root@localhost bin]#經(jīng)過以上的下載、解壓、啟動、查看狀態(tài)四個步驟,就將emqx軟件正常運行起來了。
二、mqtt客戶端的編寫
編寫mqtt客戶端,是需要eclips的paho庫的,這個庫是一個mqtt客戶端的編程接口庫。
在基于maven的java項目中,要增加一個dependency。
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>接下來就是寫client的代碼了。
下面的代碼中有發(fā)送消息的代碼,也有接收消息的代碼,接收消息的代碼就是那個OnMessageCallback,有了這個Callback類就可以接收mqtt消息了。
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "哈哈哈,我又來了。";int qos = 2;String broker = "tcp://123.56.182.37:1883"; //這里是broker的地址String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設(shè)置回調(diào),這個回調(diào)類是用來接收mqtt消息的,如果只是想要發(fā)布mqtt消息,那么就不需要這個回調(diào)了。// 當(dāng)然,如果只是想接收mqtt消息,不想發(fā)送,那么就只設(shè)置這個回調(diào),下面的發(fā)送代碼也就不需要了client.setCallback(new OnMessageCallback());// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發(fā)布所需參數(shù)long i = 0;while (i < 10e10) {MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);client.publish(pubTopic, message);Thread.sleep(10000);i++;System.out.println("Message published, i = " + i);}client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} } import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進(jìn)行重連System.out.println("連接斷開,可以做重連");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執(zhí)行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內(nèi)容:" + new String(message.getPayload(), "utf-8"));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());} }以上兩步就可以完成一個具備基本功能的mqtt協(xié)議客戶端了,這個客戶端既可以發(fā)布消息,也可以接收消息。當(dāng)然,如果只想要一個方面的功能,也可以適當(dāng)裁剪以適應(yīng)自己的需要。
參考資料:
1、https://www.emqx.io/docs/zh/v4.4/getting-started/install.html
總結(jié)
以上是生活随笔為你收集整理的用emqx做mqtt客户端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前端03——css
- 下一篇: NBUT 1119 Patchouli'