日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > windows >内容正文

windows

Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline

發(fā)布時間:2024/4/30 windows 82 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Netty實戰(zhàn) IM即時通訊系統(tǒng)(十二)構(gòu)建客戶端與服務(wù)端pipeline

零、 目錄

  • IM系統(tǒng)簡介
    • Netty 簡介
    • Netty 環(huán)境配置
    • 服務(wù)端啟動流程
    • 客戶端啟動流程
    • 實戰(zhàn): 客戶端和服務(wù)端雙向通信
    • 數(shù)據(jù)傳輸載體ByteBuf介紹
    • 客戶端與服務(wù)端通信協(xié)議編解碼
    • 實現(xiàn)客戶端登錄
    • 實現(xiàn)客戶端與服務(wù)端收發(fā)消息
    • pipeline與channelHandler
    • 構(gòu)建客戶端與服務(wù)端pipeline
    • 拆包粘包理論與解決方案
    • channelHandler的生命周期
    • 使用channelHandler的熱插拔實現(xiàn)客戶端身份校驗
    • 客戶端互聊原理與實現(xiàn)
    • 群聊的發(fā)起與通知
    • 群聊的成員管理(加入與退出,獲取成員列表)
    • 群聊消息的收發(fā)及Netty性能優(yōu)化
    • 心跳與空閑檢測
    • 總結(jié)
    • 擴展

    一、 ChannelInboundHandlerAdapter 與 ChannelOutboundHandlerAdapter

  • 首先是ChannelInboundHandlerAdapter , 這個適配器只用用于實現(xiàn)其接口ChannelInboundHandler 的所有方法,這樣我們在編寫自己的handler時就不需要實現(xiàn)handler里的每一個方法,而只需要實現(xiàn)我們關(guān)心的方法 , 默認(rèn)情況下 , 對于ChannelInboundHandlerAdapter , 我們比較關(guān)心的是他的channelRead()

    ChannelInboundHandlerAdapter.java@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}
  • 他的作用就是接受上一個handler的輸出 , 這里的msg 就是上一個handler的輸出 。 大家也可以看出 , 默認(rèn)情況下adapter會通過fireChannelRead() 方法直接把本handler的處理結(jié)果傳遞給下一個handler 。
  • 與ChannelinboundHandlerAdapter相似的是ChannelOutboundHandlerAdapter , 他的核心方法是 write() 方法

    ChannelOutboundHandlerAdapter.java@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}
  • 默認(rèn)情況下 , 這個adapter也會把對象傳遞給下一個outbound節(jié)點 , 需要注意的是他的傳播順序與inbound相反 。
  • 我們往pipeline中添加的第一個handler中的channelRead方法中 , msg對象其實就是ByteBuf , 服務(wù)端在接收數(shù)據(jù)之后 , 應(yīng)該首先把這個ByteBuf解碼 , 然后把解碼之后的結(jié)果傳遞給下一個handler :

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf requestByteBuf = (ByteBuf) msg;// 解碼Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);// 解碼后的對象傳遞到下一個 handler 處理ctx.fireChannelRead(packet)}
  • 在開始解碼之前我們來了解另外一個特殊的handler

  • 二 、 ByteToMessageDecoder

  • 通常情況下 , 無論是我們在客戶端還是在服務(wù)端 , 當(dāng)我們接收到數(shù)據(jù)之后 , 首先要做的就是事情就是把二進制數(shù)據(jù)轉(zhuǎn)換成我們所需要的java 對象 , 所以Netty 很貼心的幫我們寫了一個父類 ,來專門做這個事情 , 我們來看一下如何使用這個類來實現(xiàn)服務(wù)端二進制數(shù)據(jù)解碼:

    public class PacketDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {out.add(PacketCodeC.INSTANCE.decode(in));}}
  • 我們繼承了 ByteToMessageDecoder 這個類之后 , 我們只需要實現(xiàn)decode()方法 , 這里的in在傳遞進來的時候就已經(jīng)是ByteBuf 對象了 , 所以不再需要我們進行強轉(zhuǎn) , 第三個參數(shù)是List類型 , 我們通過往這個List里添加解碼之后的結(jié)果對象 , 就可以自動實現(xiàn)結(jié)果往下一個handler進行傳遞 , 這樣我們就實現(xiàn)了解碼邏輯的handler
  • 值的注意的一點是 , 對于Netty 里面的ByteBuf , 我們使用的4.1.6Final版本 , 默認(rèn)情況下用的是堆外內(nèi)存 , 在ByteBuf這一小節(jié)中 , 我們提到 , 堆外內(nèi)存需要我們手動釋放 , 在我們前面的小節(jié)的解碼例子中 , 其實我們已經(jīng)漏掉了這個操作 , 這一點是非常致命的 , 隨著程序運行越來越久 , 內(nèi)存泄露的問題就慢慢暴露出來了 , 而這里我們使用 ByteToMessageDecoder , Netty會自動進行內(nèi)存釋放 , 我們不用操心太多內(nèi)存管理方面的邏輯 , 關(guān)于如何自動釋放 , 可以參考 ByteToMessageDecoder的實現(xiàn)原理(8-2)。
  • 當(dāng)我們通過解碼器把二進制數(shù)據(jù)轉(zhuǎn)換到j(luò)ava 對象即指令數(shù)據(jù)包之后 , 就可以針對每一種指令數(shù)據(jù)包編寫邏輯了 。

  • 三 、 SimpleChannelInboundHandler

  • 回顧一下我們之前處理java 對象的邏輯

    if (packet instanceof LoginRequestPacket) {// ...} else if (packet instanceof MessageRequestPacket) {// ...} else if ...
  • 我們通過if - else 來進行邏輯處理 , 當(dāng)我們需要處理的指令越來越多的時候 , 代碼就會顯得越來越臃腫 ,這個時候我們可以通過給pipeline 添加多個handler(集成 ChannelInboundHandlerAdapter) 來解決多if-else 的問題

    XXXHandler.javaif (packet instanceof XXXPacket) {// ...處理} else {ctx.fireChannelRead(packet); }
  • 這樣寫的好處是 每次添加一個指令處理器 , 邏輯處理的框架都是一致的
  • 但是大家也注意到了 , 這里我們編寫指令處理器handler的時候 , 依然編寫了一段我們其實不用關(guān)心的if-else 判斷 , 然后還需要手動傳遞無法處理的指令對象至下一個指令處理器 , 這也是一段重復(fù)度極高的代碼 , 因此Netty基于這種考慮抽象出了一個SimpleChannelInboundHandler對象 , 類型判斷和對象傳遞都自動幫我們實現(xiàn)了 , 我們可以只關(guān)注我們需要處理的對象即可
  • 接下來我們看一看 , 如何使用 SimpleChannelInboundHandler 簡化我們的指令處理邏輯

    LoginRequestHandler.javapublic class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {// 登錄邏輯}}
  • SimpleChannleInboundHandler 從字面意思可以看出 , 使用它非常簡單 , 我們在集成這個類的時候 , 給他傳遞一個泛型 , 然后在 ChannelRead0()方法里面 , 我們不在用通過if-else來判斷當(dāng)前對象是否是本handler可以處理的對象 , 也不用強轉(zhuǎn) , 不用往下傳遞本handler處理不了的對象 , 這一切都已經(jīng)交給父類SimpleChannelHandler來實現(xiàn)了 , 我們只需要專注于我們要處理的業(yè)務(wù)邏輯即可。
  • 四 、 MessageToByteEncoder

  • 在前面的幾個小節(jié) , 我們已經(jīng)實現(xiàn)了登錄和消息處理邏輯 , 處理完請求之后 , 我們都會給客戶端一個響應(yīng) , 在寫響應(yīng)之前 , 我們需要把響應(yīng)對象編碼成ByteBuf , 結(jié)合本小節(jié)的內(nèi)容 , 最后的邏輯框架如下:

    public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {LoginResponsePacket loginResponsePacket = login(loginRequestPacket);ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);ctx.channel().writeAndFlush(responseByteBuf);}}public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {MessageResponsePacket messageResponsePacket = receiveMessage(messageRequestPacket);ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageRequestPacket);ctx.channel().writeAndFlush(responseByteBuf);}}
  • 我們注意到 , 我們處理完每一種指令之后的邏輯都是相似的 , 都需要進行解碼 , 然后調(diào)用 writeAndFlush() 將數(shù)據(jù)寫到對端 , 這個編碼的過程其實也是重復(fù)的邏輯 , 而且在編碼的過程中國 , 我們還需要手動去創(chuàng)建一個ByteBuf :

    PacketCodeC.javapublic ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {// 1. 創(chuàng)建 ByteBuf 對象ByteBuf byteBuf = byteBufAllocator.ioBuffer();// 2. 序列化 java 對象// 3. 實際編碼過程return byteBuf;}
  • 而Netty 提供了一個特殊的channelHandler 來專門處理這種邏輯 , 我們不需要每一次將響應(yīng)寫到對端的時候調(diào)用一次編碼邏輯進行編碼 , 也不需要自行創(chuàng)建ByteBuf , 這個類叫做 MessageToByteEncoder , 從字面意思可以看出 , 他的功能就是將對象轉(zhuǎn)化到二進制數(shù)據(jù) 。

  • 我們來看一下如何使用 MessageToByteEncoder 來實現(xiàn)編碼邏輯

    public class PacketEncoder extends MessageToByteEncoder<Packet> {@Overrideprotected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {PacketCodeC.INSTANCE.encode(out, packet);}}
  • PacketEncoder 集成自 MessageToByteEncoder , 泛型參數(shù) Packet 表示這個類的作用是 將Packet 類型對象到二進制的轉(zhuǎn)化 。

  • 這里我們只需要實現(xiàn)encode() 方法 , 我們注意到 , 這個方法的第二個參數(shù)是java對象 , 而第三個參數(shù)是ByteBuf 對象 , 我們在這個方法里面要做的事情就是把java對象里面的字段寫到ByteBuf , 我們不在需要自行去分配ByteBuf , 因此大家注意到 , PacketCodeC 的 encode() 方法 的定義也改了 , 下面是更改前后的對比:

    PacketCodeC.java// 更改前的定義public ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {// 1. 創(chuàng)建 ByteBuf 對象ByteBuf byteBuf = byteBufAllocator.ioBuffer();// 2. 序列化 java 對象// 3. 實際編碼過程return byteBuf;}// 更改后的定義public void encode(ByteBuf byteBuf, Packet packet) {// 1. 序列化 java 對象// 2. 實際編碼過程}
  • 我們可以看到 , PacketCodeC 不在需要手動創(chuàng)建ByteBuf對象 , 不在需要把創(chuàng)建完ByteBuf 的進行返回 , 當(dāng)我們向pipeline 中添加了這個編碼器之后 , 我們在指令處理完畢之后就只需要writeAndFlush java 對象即可

    public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {ctx.channel().writeAndFlush(login(loginRequestPacket));}}public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageRequestPacket) {ctx.channel().writeAndFlush(receiveMessage(messageRequestPacket));}}
  • 通過我們前面的分析 , 可以看到 , Netty 為了讓我們邏輯更加清晰簡潔 , 幫我們做了很多工作 嗎能直接用Netty 自帶的handler 來解決問題 , 不要重復(fù)制造輪子 , 在下面的小節(jié)中 , 我們會繼續(xù)探討Netty還有哪些開箱即用的handler

  • 分析完服務(wù)端的pipeline 與 handler 組成結(jié)構(gòu) , 相信你們也不難自行分析出客戶端handler 的結(jié)構(gòu)了 , 最后我們來看一下服務(wù)端和客戶端完整的pipeline 與handler結(jié)構(gòu)

  • 五、 構(gòu)建服務(wù)端和客戶端 pipeline 與handler

  • 對應(yīng)我們的代碼

    服務(wù)端serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());ch.pipeline().addLast(new LoginRequestHandler());ch.pipeline().addLast(new MessageRequestHandler());ch.pipeline().addLast(new PacketEncoder());}});客戶端bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());ch.pipeline().addLast(new LoginResponseHandler());ch.pipeline().addLast(new MessageResponseHandler());ch.pipeline().addLast(new PacketEncoder());}});
  • 六 、 完整代碼

    七 、 總結(jié)

  • 在本小節(jié)中我們通過學(xué)習(xí)Netty 內(nèi)置的channelHandler 來逐步構(gòu)建我們服務(wù)端的pipeline , 通過內(nèi)置的channelHandler 可以減少很多重復(fù)的邏輯
  • 基于 ByteToMessageDecoder , 我們可以實現(xiàn)自定義解碼 , 而不用關(guān)心ByteBuf 的強轉(zhuǎn)和解碼結(jié)果的傳遞。
  • 基于 SimpleChannelInboundHandler , 我們可以實現(xiàn)每一種指令的處理 , 不在需要強轉(zhuǎn) , 不再需要冗長的if-else , 不在需要手動傳遞對象
  • 基于 MessageToByteEncoder , 我們可以實現(xiàn)自定義編碼 , 而不用關(guān)心ByteBuf 的創(chuàng)建 , 不用每次向?qū)Χ藢憯?shù)據(jù)時都調(diào)用編碼邏輯
  • 八 、 思考

  • 在 LoginRequestHandler 以及 MessageRequestHandler 的 channelRead0() 方法中,第二個參數(shù)對象(XXXRequestPacket)是從哪里傳遞過來的?
  • 答: channelRead0(ChannelHandlerContext ctx, MessageRequestPacket msg) 的msg是由父類SimpleChannelInboundHandler的channelRead() 方法判斷是需要類型后, 強轉(zhuǎn)類型后傳遞進來的

    I imsg = (I) msg;channelRead0(ctx, imsg);
  • 如何判斷 ByteBuf 是使用堆外內(nèi)存?
  • 答: ByteBuf.isDirect()
  • pipeline添加decode和encode的時候,有嚴(yán)格的順序限制吧?
  • 答: ouBound類和inBound類之間的順序一般沒有限制,通常情況下,同一種類型的 handler 的添加順序需要注意
  • 總結(jié)

    以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。