日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Netty实战 IM即时通讯系统(十一)pipeline与channelHandler

發布時間:2024/4/30 windows 65 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实战 IM即时通讯系统(十一)pipeline与channelHandler 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Netty實戰 IM即時通訊系統(十一)pipeline與channelHandler

零、 目錄

  • IM系統簡介
    • Netty 簡介
    • Netty 環境配置
    • 服務端啟動流程
    • 客戶端啟動流程
    • 實戰: 客戶端和服務端雙向通信
    • 數據傳輸載體ByteBuf介紹
    • 客戶端與服務端通信協議編解碼
    • 實現客戶端登錄
    • 實現客戶端與服務端收發消息
    • pipeline與channelHandler
    • 構建客戶端與服務端pipeline
    • 拆包粘包理論與解決方案
    • channelHandler的生命周期
    • 使用channelHandler的熱插拔實現客戶端身份校驗
    • 客戶端互聊原理與實現
    • 群聊的發起與通知
    • 群聊的成員管理(加入與退出,獲取成員列表)
    • 群聊消息的收發及Netty性能優化
    • 心跳與空閑檢測
    • 總結
    • 擴展

    一、 簡介

  • 這一小節中 , 我們來學習Netty 中一大核心組件: pipeline 與 channelHandler
  • 上一小節最后 , 我們提出: 如何避免switch-case 泛濫? , 我們注意到, 不管是服務端還是客戶端 , 處理流程大致分為以下步驟
  • 我們把這三類邏輯都寫在一個類里面, 客戶端寫在 ClientHandler , 服務端寫在 ServerHandler , 如果要做功能的擴展 (比如 , 我們要校驗魔數 , 或者其他特殊邏輯) , 只能在一個類里面去修改 , 這個類就會變得越來越臃腫。
  • 另外 , 我們注意到, 每次發指令數據包都要是都調用編碼器編碼成byteBuf , 對于這類場景的編碼優化, 我們能想到的辦法自然是模塊化處理 , 不同的邏輯放置到單獨的類中來處理 , 最后將這些邏輯串聯起來 , 形成一個完整的邏輯處理鏈 。
  • Netty中的pipeline 與channelHandler 正是來解決這個問題的: 他通過責任鏈設計模式來組織代碼邏輯 , 并且能夠支持邏輯的動態添加和刪除 , Netty能夠支持各類協議的支持和擴展 , 比如: HTTP , WebSocket ,Redis 靠的就是pipeline與channelHanler。
  • 二、 pipeline 與channelHandler 的構成

  • 無論是從服務端來看 , 還是從客戶端來看 , 在Netty整個框架中, 一條連接對應著一個channel ,這個channel 所有的處理邏輯都在一個叫做ChannelPipeline的對象里面 , ChannelPipeline 是一個雙向鏈表結構 , 他和Channel之間是一對一的關系
  • ChannelPipeline 里面每個節點都是一個ChannelHandlerContext 對象 , 這個對象能夠拿到和Channel相關的上下文信息 , 然后這個對象抱著一個重要的對象 , 那就是邏輯處理器 ChannelHandler
  • 接下啦我們來看一下ChannelHandler有哪些分類
  • 三、 ChannelHandler 的分類

  • 可以考到ChannelHandler 有兩大子接口
  • 第一個子接口是ChannelInboundHandler , 從字面意思也可以猜到,他是處理讀數據的邏輯 , 比如: 我們在一段讀到一段數據 , 首先要解析這段數據 , 然后對這些數據做一些邏輯處理 , 最終把響應寫到對端 , 在開始組裝響應之前的邏輯 , 都可以放在ChannelInboundHandler 中處理 , 他的一個最重要的方法就是 channelRead , 讀者可以將ChannelInboundHandler中的邏輯處理過程與TCP的七層協議的解析連接起來 , 收到的數據一層一層從物理層傳送到我們的應用層 。
  • 第二個子接口 ChannelOutboundHandler 是處理寫數據的邏輯 , 他是定義我們一段在組裝完響應之后吧數據寫到對端的邏輯 ,比如我們封裝好一個response對象 , 接下來我們可能對這個response做一些其他的特殊的邏輯, 然后 , 在編碼成byteBuf , 最終寫到對端 , 它里面最核心的一個方法就是write() , 讀者可以將ChannelOutboundHandler的邏輯處理過程與TCP 的七層協議的封裝過程聯系起來 , 我們在應用層組裝響應之后 , 通過層層協議的封裝 , 直到最底層的物理層。
  • 這兩個子接口分別有對應的默認的實現 , ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter , 他們分別實現了兩大接口的所有功能 , 默認情況下回吧讀寫事件傳播到下一個handler。
  • 說了這么多理論, 其實還是比較抽象的 , 下面我們就用一個具體的demo 來學習一下這兩個handler的事件傳播機制。
  • 四、 ChannelInboundHanndler 的事件傳播

  • 關于ChannelInboundHandler , 我們拿 ChannelRead() 為例子 , 來體驗一下inbound時間的傳播

  • 我們在服務端的pipeline 添加三個ChannelInboundHandler

    Test_12_Server.javaserverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_11_InboundHandlerA());ch.pipeline().addLast(new Test_11_InboundHandlerB());ch.pipeline().addLast(new Test_11_InboundHandlerC());}});
  • 每個inBoundHandler 繼承自ChannelInboundHandlerAdapter , 然后實現了 channelRead() 方法

    /*** 2019年1月29日* @author outman* 服務端處理 A*/class Test_11_InboundHandlerA extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf)msg;System.out.println("Test_11_InboundHandlerA --> " + buffer.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);}}/*** 2019年1月29日* @author outman* 服務端處理 B*/class Test_11_InboundHandlerB extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf)msg;System.out.println("Test_11_InboundHandlerB --> " + buffer.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);}}/*** 2019年1月29日* @author outman* 服務端處理 C*/class Test_11_InboundHandlerC extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf)msg;System.out.println("Test_11_InboundHandlerC --> " + buffer.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);}}
  • 在channelRead() 方法里面 , 我們當前handler 的信息 , 然后調用父類的channelRead() 方法 , 而這里父類的channelRead()方法會自定調用下一個inboundHandler的channelRead() , 并且會把處理完畢的對象傳遞給下一個inboundHandler , 我們例子中傳遞的對象都是同一個msg 。
  • 我們通過 addLast()方法來為pipeline添加 inboundHandler , 當然 , 除了這個方法還有其他的方法 , 感興趣的同學可以去瀏覽一下pipeline的api ,這里我們添加的順序為 A --> B --> c , 然后我們來看一下控制臺輸出
  • 五、ChannelOutboundHandler 的時間傳播

  • 關于ChannelOutboundHandler , 我們拿write()為例子,來體驗一下outbound事件的傳播。

  • 我們繼續在服務端的pipeline添加三個ChannelOutboundHandler

    Test_12_Server.javaserverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// inbound 服務端讀數據邏輯ch.pipeline().addLast(new Test_11_InboundHandlerA());ch.pipeline().addLast(new Test_11_InboundHandlerB());ch.pipeline().addLast(new Test_11_InboundHandlerC());// outbound 服務端系數據邏輯ch.pipeline().addLast(new Test_11_OutboundHandlerA());ch.pipeline().addLast(new Test_11_OutboundHandlerB());ch.pipeline().addLast(new Test_11_OutboundHandlerC());}});
  • 每個outboundHandler都繼承自ChannelOutboundHandlerAdapter , 然后實現了write()方法

    /*** 2019年2月14日* @author outman** 服務端寫數據邏輯 A*/class Test_12_OutboundHandlerA extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("Test_12_OutboundHandlerA --> " + msg);super.write(ctx, msg, promise);}}/*** 2019年2月14日* @author outman** 服務端寫數據邏輯 B*/class Test_12_OutboundHandlerB extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("Test_12_OutboundHandlerB --> " + msg);super.write(ctx, msg, promise);}}/*** 2019年2月14日* @author outman** 服務端寫數據邏輯 C*/class Test_12_OutboundHandlerC extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("Test_12_OutboundHandlerC --> " + msg);super.write(ctx, msg, promise);}}
  • 在write()方法里面 , 我們打印當前handler的信息,然后調用父類write()方法,而這里父類的write()方法會自動調用到下一個outboundHandler的write()方法,并且把當前outboundHandler里處理完畢的對象傳遞到下一個outboundHandler.
  • 我們通過addLast()方法添加outboundHandler的順序為 A -> B -> C , 最后我們來看一下控制臺的輸出
  • 可以看到outboundHandler的執行順序與我們添加的順序相反 , 最后,我們在來看一下pipeline的結構和執行順序。

  • pipeline 的結構 不管我們定義的是那種類型的handler, 最終他們都以雙向鏈表的形式連接,這里實際鏈表的節點是ChannelHandlerContext , 這里為了讓結構清晰突出,可以直接把節點看做ChannelHandlerContext
  • pipeline 的執行順序 雖然兩種類型的handler在一個雙向鏈表里, 但是這兩類handler 的分工是不一樣的,inboundHandler的事件通常只會傳播到下一個channelHandler,outboundHandler的事件通常通常只會傳播到下一個outboundHandler , 兩者互不干擾。
  • 六 、 總結

  • 通過我們前面編寫客戶端、服務端處理邏輯引出了pipeline和channelHandler的概念
  • channelHandler分為inbound和outbound兩種類型的接口 , 分別是處理數據讀和數據寫的邏輯
  • 兩種類型的handler均有相應的默認實現,默認會把事件傳遞到下一個 , 這里的傳遞事件其實說白了就是把本handler的處理結果傳遞給下一個handler繼續處理
  • inboundHandler的執行順序與我們實際的添加順序相同 ,而outboundHandler 相反。
  • 七 、 思考

  • 參考本文中的例子 , 如果我們往pipeline里面添加handler的順序不變,要在控制臺打印出inboundA -> inboundB -> outboundB -> outboundA , 該如何實現?
  • 如何在每個handler里面打印上一個handler 的處理結束的時間點呢?
  • 答: 可以將上一個handler處理結束的時間放在channel的attr中
  • 如何讓outboundHandler按照添加順序執行?
  • 答: pipeline有andLast() 和andFrist()方法 ,inboundHandler在執行過程中正向遍歷鏈表 使用andLast()方法先進先出可以順序執行 , outboundHandler在執行過程中逆向遍歷鏈表, 使用andFrist()方法 先進后出可以順序執行 。
  • OutBoundHandler一直沒有被執行到,pipeline也已添加,有可能是什么原因呢?
  • 答: ctx.channel().writeAndFlush(msg); 進行寫事件才會觸發out調用鏈 。 所以需要將inboundHandlerC 中執行寫邏輯 才會執行outboundHandler.
  • 總結

    以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(十一)pipeline与channelHandler的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。