日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Java MQTT 客户端之 Paho

發(fā)布時間:2023/12/13 综合教程 25 生活家
生活随笔 收集整理的這篇文章主要介紹了 Java MQTT 客户端之 Paho 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Paho 自動重連后訂閱的主題會清空,所以需要實現(xiàn) MqttCallbackExtended 接口,在 connectComplete 方法添加訂閱主題;而不是實現(xiàn) MqttCallback 接口

一、添加引用

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

二、添加配置

mqtt:
  client:
    username: admin
    password: public
    serverURI: tcp://192.168.137.101:1883
    clientId: paho_${random.int[1000,9999]}
    keepAliveInterval: 120
    connectionTimeout: 30
  producer:
    defaultQos: 1
    defaultRetained: true
    defaultTopic: topic/test1
  consumer:
    consumerTopics: topic/test2,topic/test3

三、代碼

3.1.客戶端

@Configuration
public class MqttConfig {
    @Value("${mqtt.client.username}")
    private String username;
    @Value("${mqtt.client.password}")
    private String password;
    @Value("${mqtt.client.serverURI}")
    private String serverURI;
    @Value("${mqtt.client.clientId}")
    private String clientId;
    @Value("${mqtt.client.keepAliveInterval}")
    private int keepAliveInterval;
    @Value("${mqtt.client.connectionTimeout}")
    private int connectionTimeout;

    @Autowired
    private MyMqttCallback myMqttCallback;

    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClientPersistence persistence = mqttClientPersistence();
            MqttClient client = new MqttClient(serverURI, clientId, persistence);

            myMqttCallback.setMqttClient(client);
            client.setCallback(myMqttCallback);

            client.connect(mqttConnectOptions());
//            client.subscribe(subTopic);

            return client;
        } catch (MqttException e) {
            System.out.println(e.getMessage());
            return null;
        }
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(connectionTimeout);
        options.setKeepAliveInterval(keepAliveInterval);

        return options;
    }

    public MqttClientPersistence mqttClientPersistence() {
        return new MemoryPersistence();
    }
}

3.2.訂閱者

@Component
public class MyMqttCallback implements MqttCallbackExtended {

    @Value("${mqtt.consumer.consumerTopics}")
    private String[] consumerTopics;

    @Autowired
    private MqttService mqttService;

    private MqttClient mqttClient;

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("連接斷開");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        mqttService.message(topic, message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }


    @Override
    public void connectComplete(boolean b, String s) {
        try {
            mqttClient.subscribe(consumerTopics);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }

    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }
}

3.3.發(fā)布者

@Component
public class MqttProducer {

    @Value("${mqtt.producer.defaultQos}")
    private int defaultProducerQos;
    @Value("${mqtt.producer.defaultRetained}")
    private boolean defaultRetained;
    @Value("${mqtt.producer.defaultTopic}")
    private String defaultTopic;

    @Autowired
    private MqttClient mqttClient;

    public void send(String payload) {
        this.send(defaultTopic, payload);
    }

    public void send(String topic, String payload) {
        this.send(topic, defaultProducerQos, payload);
    }

    public void send(String topic, int qos, String payload) {
        this.send(topic, qos, defaultRetained, payload);
    }

    public void send(String topic, int qos, boolean retained, String payload) {
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}
@RestController
public class MqttController {

    @Autowired
    private MqttProducer mqttProducer;

    @RequestMapping("/send")
    public void send() {

        mqttProducer.send("test content");

    }
}

完整代碼:GitHub

參考

MQTT Client in Java
MQTT Java 客戶端庫
使用paho的MQTT時遇到的重連導(dǎo)致訂閱無法收到問題和解決

?

總結(jié)

以上是生活随笔為你收集整理的Java MQTT 客户端之 Paho的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。