日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于WebSocket协议实现Broker

發布時間:2024/4/14 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于WebSocket协议实现Broker 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

寫在前面:
前兩篇文字<<基于MQTT協議談談物聯網開發-華佗寫代碼>>,<<基于MQTT協議實現Broker-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基于MQTT協議的一些常見應用場景,并以一個簡單的消息推送系統作為例子具體闡述了Mqtt Broker部分的實現,之前主要以原生android或者iOS或者服務端代理作為例子,考慮到在移動端開發時,選擇的技術棧有所不同,有的選擇web前端開發.作為例子,這里以之前的消息推送系統為例基于web前端開發,繼續敘述基于WebSocket協議實現Broker.

?

1.WebSocket協議主要特點:

(1)基于http協議握手建立tcp長連接;

(2)相比http,WebSocket協議交換最小化,降低網絡流量;

(3)雙向通信,服務器可以主動推送數據給客戶端;

?

2.Mqtt Broker具體實現(WebSocket部分):

2.1Mqtt Broker架構草圖:

?

2.2Mqtt Broker實現細節:

(1)新增實現websocket server,監聽不同端口;

(2)每個websocket連接,實例化一個mqttclient負責其協議解析,消息發布和訂閱等;

(3)復用之前Mqtt Broker與RabbitMQ通信部分,具體參考上一篇文字;

(4)其他...

?

2.3Mqtt Broker代碼實現(WebSocket部分):

type tcpKeepAliveListener struct { ????*net.TCPListener }
var upgrader = websocket.Upgrader{ ????ReadBufferSize: 1024, ????WriteBufferSize: 1024, } ? //監聽websocket server地址,注冊websocket handler func (mb *MqttBroker) ListenAndServeWeb() { defer mb.wg.Done()http.HandleFunc("/", mb.webHandler)webserver := &http.Server{Addr: mb.webaddr, Handler: nil}var listener net.Listenervar err errorlistener, err = net.Listen("tcp", mb.webaddr)if err != nil {return}U.GetLog().Printf("listen and serve web broker on %s", mb.webaddr)err = webserver.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
}

//每一個Websocket連接,實例化一個MqttClient負責其協議解析,以及與rabbitmq的通信
func (mb *MqttBroker) webHandler(w http.ResponseWriter, r *http.Request) { ????upgrader.CheckOrigin = checkSameOrigin ????conn, err := upgrader.Upgrade(w, r, nil) ????if err != nil { ????????U.GetLog().Printf("upgrade error:%v", err) ????????return ????} ????mqttclient, err := NewMqttClient(mb.wg, mb, nil, conn, "web") ????if err != nil { ????????return ????} ????mb.clientMap[mqttclient.GetClientID()] = mqttclient ????mb.wg.Add(1) ????go mqttclient.ServeWeb() }

?

2.4Mqtt Client代碼實現(WebSocket部分):

//定義WebSocket通信消息格式
//Action選項有publish,subscribe,unsubscribe
type WebMessage struct {Action
stringTopic stringPayload string }type MqttClient struct {wg *sync.WaitGroupbroker *MqttBrokertcpconn net.Connwconn *websocket.Conn...needDisConn bool }//通過WebMessage.Action區分消息指令類型 func (mc *MqttClient) ServeWeb() {defer mc.wg.Done()defer mc.commonDefer()if mc.wconn == nil {return}for {if mc.needDisConn {break}_, message, err := mc.wconn.ReadMessage()if err != nil {U.GetLog().Printf("handle message error:%v", err)mc.needDisConn = truecontinue}wm := WebMessage{}err = json.Unmarshal(message, &wm)if err != nil {U.GetLog().Printf("json.Unmarshal(message, &wm) error:%v", err)continue}switch wm.Action {case "subscribe":err = mc.handleWebSubscibe(wm.Topic)case "publish":err = mc.handleWebPublish(wm.Topic, wm.Payload)case "unsubscribe":err = mc.handleWebUnSubscribe(wm.Topic)case "ping":mc.lastheartbeat = 0default:U.GetLog().Printf("unexpected WebMessage Action:%s", wm.Action)continue}if err != nil {U.GetLog().Printf("handle message error:%v", err)}mc.lastheartbeat = 0} }

?

3.WebSocket Client端實現:

3.1實現細節:

(1)建立與WebSocket Server的連接;

(2)初始化WebSocket,注冊相關回調函數;

(3)實現WebSocket斷線重連機制;

(4)封裝類似mqtt基于topic的發布訂閱等接口;

(5)Nodejs端需要browserify相關js文件,Javascript端可以直接調用WebSocket;

(6)其他...

?

3.2具體代碼實現:

var WebSocket = require('ws'); var WEBSOCKET_MQTT_BROKER = 'ws://your_server_ip/ws/'; var ping = {Action: "ping" };var _listeners = {}; var _websocket = null; var _connected = false;_access = function () {console.log('try mqtt.connect');_connect_websocket();setInterval(function () {_reconnect_websocket();if (_websocket != null && _connected) {_websocket.send(JSON.stringify(ping));}}, 3000); }; //websocket初始化,并實現相關回調函數 _init_websocket = function () {if (_websocket == null) {return;}_websocket.onopen = function () {_connected = true;console.log("Connected to WebSocket server.");for (var topic in _listeners) {var sub = {Action: "subscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(sub));}};_websocket.onclose = function () {_connected = false;_websocket = null;console.log("Disconnected");};_websocket.onmessage = function (evt) {console.log('recv data from server: ' + evt.data);var dataObj = JSON.parse(evt.data);_listeners[dataObj.Topic] && _listeners[dataObj.Topic](dataObj.Payload);};_websocket.onerror = function (evt) {_connected = false;_websocket = null;console.log('Error occured: ' + evt);}; };_connect_websocket = function () {if (_connected) {return;}_websocket = new WebSocket(WEBSOCKET_MQTT_BROKER);_init_websocket(); }; //斷線重連,通過定時器實現每三秒斷線重連 _reconnect_websocket = function () {if (_connected) {return;}_websocket = new WebSocket(WEBSOCKET_MQTT_BROKER);_init_websocket(); }; //模擬mqtt發布消息 sendMessage = function (topic, data) {if (!_websocket || !_connected) {var err = new Error('iot client not ready.');console.warn(err);return;}var send_data = JSON.stringify(data);var pub = {Action: "publish",Topic: topic,Payload: send_data};_websocket.send(JSON.stringify(pub)); }; //模擬mqtt訂閱消息,并根據topic注冊回調函數 onMessage = function (topic, callback) {_listeners[topic] = callback;if (!_websocket || !_connected) {console.warn('onMessage, but iot client not ready.');return;}var sub = {Action: "subscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(sub)); }; //模擬mqtt取消訂閱,并根據topic刪除對應回調函數 stopReceiveMessage = function (topic) {delete _listeners[topic];if (!_websocket || !_connected) {console.warn('stopReceiveMessage, but iot client not ready.');return;}var unsub = {Action: "unsubscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(unsub)); };_access();

?

4.WebSocket相關nginx配置:

server { listen 80;server_name your_server_name;...location /ws/ {proxy_redirect off;add_header Access-Control-Allow-Origin *;add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';proxy_pass http://127.0.0.1:2884/;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}
}

?

出于篇幅考慮,之前兩篇文字敘述過的內容,比如Mqtt Broker其他實現部分以及與RabbitMQ通信部分,都是復用之前的代碼邏輯,這里不再贅述,Mqtt Broker中WebSocket部分相當于使用WebSocket協議做了MQTT協議的翻譯轉換,也有一些成員變量,用到了也不一一具體注釋了,主要通過代碼關鍵路徑敘述實現的一些細節,如有錯誤,懇請指出,轉載也請注明出處!!!

?

未完待續...

轉載于:https://www.cnblogs.com/huatuo/p/9323729.html

總結

以上是生活随笔為你收集整理的基于WebSocket协议实现Broker的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。