日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > c/c++ >内容正文

c/c++

MQTT协议笔记之mqtt.io项目TCP协议支持

發(fā)布時(shí)間:2024/4/13 c/c++ 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MQTT协议笔记之mqtt.io项目TCP协议支持 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

MQTT定義了物聯(lián)網(wǎng)傳輸協(xié)議,其標(biāo)準(zhǔn)傾向于原始TCP實(shí)現(xiàn)。構(gòu)建于TCP的上層協(xié)議堆棧,諸如HTTP等,在空間上多了一些處理路徑,稍微耗費(fèi)了CPU和內(nèi)存,雖看似微乎其微,但對(duì)很多處理能力不足的嵌入式設(shè)備而言,選擇原始的TCP卻是最好的選擇。

但單純TCP不是所有物件聯(lián)網(wǎng)的最佳選擇,提供構(gòu)建與TCP基礎(chǔ)之上的傳統(tǒng)的HTTP通信支持,尤其是瀏覽器、性能富裕的桌面涉及領(lǐng)域,還是企業(yè)最 可信賴、最可控的傳輸方式之一。支持多種多樣的連接通道,讓目前所有一切皆可聯(lián)網(wǎng),除了原始TCP Socket,還要支持構(gòu)建于其之上的HTTP、HTML5 Websocket,就很有必要。

mqtt.io,Pub/Sub中間件,也可以稱之為推送服務(wù)器,涵蓋所有主流桌面系統(tǒng)、瀏覽器平臺(tái),并且傾斜 于移動(dòng)互聯(lián)網(wǎng),以及物聯(lián)網(wǎng)的廣闊適應(yīng)天地。使用一句英文概括可能更為合適:"Make everything connect”,讓所有物件都可連接。其業(yè)務(wù)目標(biāo),可用下圖概括:

mqtt.io致力于做下一代支持所有主流桌面平臺(tái)、所有主流瀏覽器、所有可聯(lián)網(wǎng)物件都可以聯(lián)網(wǎng)的PUB/SUB消息推送系統(tǒng)。

構(gòu)建此系統(tǒng),在于降低傳統(tǒng)企業(yè)各自分散的推送系統(tǒng),統(tǒng)一運(yùn)營(yíng),統(tǒng)一管理,節(jié)省人員、運(yùn)維開支。

注意事項(xiàng)

  • mqtt.io是一個(gè)項(xiàng)目名稱,沒有官網(wǎng),http://www.mqtt.io,和這個(gè)項(xiàng)目沒有一毛錢關(guān)系。
  • 項(xiàng)目地址:https://github.com/yongboy/mqtt.io,
  • 項(xiàng)目名稱啟發(fā)于 http://socket.io http://netty.io 等知名framework。
  • 目前只實(shí)現(xiàn)QoS 0基本特性,實(shí)現(xiàn)概覽,后期會(huì)根據(jù)反饋,做出一些調(diào)整
  • 依賴

  • netty 4,目前JAVA IO界明星
  • mqtt-library?二進(jìn)制和MQTT對(duì)象的轉(zhuǎn)換,這種苦活累活都是它來做,真心讓人喜歡。
  • 數(shù)據(jù)流轉(zhuǎn)

    解碼器

    用于轉(zhuǎn)換二進(jìn)制流到JAVA對(duì)象的過程:

    123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 package io.mqtt.handler.coder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.io.ByteArrayInputStream; import java.util.List; import org.meqantt.message.Message; import org.meqantt.message.MessageInputStream; public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> { @Override public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 2) { return; } buf.markReaderIndex(); buf.readByte(); // read away header int msgLength = 0; int multiplier = 1; int digit; int lengthSize = 0; do { lengthSize++; digit = buf.readByte(); msgLength += (digit & 0x7f) * multiplier; multiplier *= 128; if ((digit & 0x80) > 0 && !buf.isReadable()) { buf.resetReaderIndex(); return; } } while ((digit & 0x80) > 0); if (buf.readableBytes() < msgLength) { buf.resetReaderIndex(); return; } byte[] data = new byte[1 + lengthSize + msgLength]; buf.resetReaderIndex(); buf.readBytes(data); MessageInputStream mis = new MessageInputStream( new ByteArrayInputStream(data)); Message msg = mis.readMessage(); mis.close(); out.add(msg); } }
    view rawMqttMessageNewDecoder.java?hosted with ? by?GitHub

    ?

    編碼器

    對(duì)所有要寫入網(wǎng)卡緩沖區(qū)的JAVA對(duì)象轉(zhuǎn)換成二進(jìn)制:

    12345678910111213141516171819202122232425 package io.mqtt.handler.coder; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; import org.meqantt.message.Message; @Sharable public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (!(msg instanceof Message)) { return; } byte[] data = ((Message) msg).toBytes(); out.add(Unpooled.wrappedBuffer(data)); } }
    view rawMqttMessageNewEncoder.java?hosted with ? by?GitHub

    ?

    借助于mqtt-library項(xiàng)目,編解碼不復(fù)雜。

    MQTT的消息處理

    ?

    1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 package io.mqtt.handler; import io.mqtt.processer.ConnectProcesser; import io.mqtt.processer.DisConnectProcesser; import io.mqtt.processer.PingReqProcesser; import io.mqtt.processer.Processer; import io.mqtt.processer.PublishProcesser; import io.mqtt.processer.SubscribeProcesser; import io.mqtt.processer.UnsubscribeProcesser; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.meqantt.message.ConnAckMessage; import org.meqantt.message.ConnAckMessage.ConnectionStatus; import org.meqantt.message.DisconnectMessage; import org.meqantt.message.Message; import org.meqantt.message.Message.Type; import org.meqantt.message.PingRespMessage; public class MqttMessageHandler extends ChannelInboundHandlerAdapter { private static PingRespMessage PINGRESP = new PingRespMessage(); private static final Map<Message.Type, Processer> processers; static { Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>( 6); map.put(Type.CONNECT, new ConnectProcesser()); map.put(Type.PUBLISH, new PublishProcesser()); map.put(Type.SUBSCRIBE, new SubscribeProcesser()); map.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser()); map.put(Type.PINGREQ, new PingReqProcesser()); map.put(Type.DISCONNECT, new DisConnectProcesser()); processers = Collections.unmodifiableMap(map); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { try { if (e.getCause() instanceof ReadTimeoutException) { ctx.write(PINGRESP).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); } else { ctx.channel().close(); } } catch (Throwable t) { t.printStackTrace(); ctx.channel().close(); } e.printStackTrace(); } @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Message msg = (Message) obj; Processer p = processers.get(msg.getType()); if (p == null) { return; } Message rmsg = p.proc(msg, ctx); if (rmsg == null) { return; } if (rmsg instanceof ConnAckMessage && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else if (rmsg instanceof DisconnectMessage) { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE); } else { ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
    view rawMqttMessageHandler.java?hosted with ? by?GitHub

    ?

    更具體的可以查看項(xiàng)目。

    小結(jié)

    簡(jiǎn)單介紹了一個(gè)簡(jiǎn)單的不能再簡(jiǎn)單的MQTT Server,只具有最基本的QoS 0類型的消息訂閱等。

    后面,對(duì)HTML 5 Websocket,會(huì)在現(xiàn)有基礎(chǔ)代碼之上,不做多大改動(dòng),增加對(duì)MQTT Over WebSocket的支持。

    總結(jié)

    以上是生活随笔為你收集整理的MQTT协议笔记之mqtt.io项目TCP协议支持的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。