日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信

發(fā)布時間:2024/4/30 windows 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

##

Netty實戰(zhàn) IM即時通訊系統(tǒng)(六)實戰(zhàn): 客戶端和服務(wù)端雙向通信

零、 目錄

  • IM系統(tǒng)簡介
    • Netty 簡介
    • Netty 環(huán)境配置
    • 服務(wù)端啟動流程
    • 實戰(zhàn): 客戶端和服務(wù)端雙向通信
    • 數(shù)據(jù)傳輸載體ByteBuf介紹
    • 客戶端與服務(wù)端通信協(xié)議編解碼
    • 實現(xiàn)客戶端登錄
    • 實現(xiàn)客戶端與服務(wù)端收發(fā)消息
    • pipeline與channelHandler
    • 構(gòu)建客戶端與服務(wù)端pipeline
    • 拆包粘包理論與解決方案
    • channelHandler的生命周期
    • 使用channelHandler的熱插拔實現(xiàn)客戶端身份校驗
    • 客戶端互聊原理與實現(xiàn)
    • 群聊的發(fā)起與通知
    • 群聊的成員管理(加入與退出,獲取成員列表)
    • 群聊消息的收發(fā)及Netty性能優(yōu)化
    • 心跳與空閑檢測
    • 總結(jié)
    • 擴展

    ###六、 實戰(zhàn): 客戶端和服務(wù)端雙向通信

  • 本節(jié)我們要實現(xiàn)的功能是客戶端連接成功后,向服務(wù)端寫出一段數(shù)據(jù) , 服務(wù)端收到數(shù)據(jù)后打印 , 并向客戶端回復一段數(shù)據(jù) 。

  • 我們先做一個代碼框架 , 然后在框架上面做修改

    public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1" , 8000 ,5);}}class Test_07_Client{public static void start(String IP , int port ,int maxRetry){NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});connect(bootstrap , IP , port , maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry , int... retryIndex) {bootstrap.connect(IP , port).addListener(future ->{int[] finalRetryIndex;if(future.isSuccess()) {System.out.println("連接成功");}else if(maxRetry ==0) {System.out.println("達到最大重試此時,放棄重試");}else {// 初始化 重試計數(shù)if(retryIndex.length == 0) {finalRetryIndex = new int[]{0};}else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() +" 連接失敗,剩余重試次數(shù):"+ maxRetry + ","+delay+"秒后執(zhí)行重試");bootstrap.config().group().schedule(()->{connect(bootstrap , IP, port , maxRetry -1 , finalRetryIndex[0]+1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server{public static void start(int port){NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}
  • 客戶端發(fā)送數(shù)據(jù)到服務(wù)端

  • 在《客戶端啟動流程》這一小節(jié) , 我們提到 客戶端相關(guān)的數(shù)據(jù)讀寫邏輯是通過BootStrap的handler()方法指定

    bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});
  • 現(xiàn)在我們在initChannel()中給客戶端添加一個邏輯處理器 , 這個處理器的作用就是負責向服務(wù)端寫數(shù)據(jù)

    bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});
  • ch.pipeline()方法返回的是和這條連接相關(guān)的邏輯處理鏈 , 采用了責任鏈處理模式 , 這里不理解沒關(guān)系 , 后面會講到。

  • 然后再調(diào)用addLast()方法添加一個邏輯處理器 , 這個邏輯處理器為的就是在客戶端建立連接成功之后向服務(wù)端寫數(shù)據(jù) , 下面是這個邏輯處理器的代碼:

    class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(new Date() + " 客戶端寫出數(shù)據(jù)...");// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = "你好,奧特曼!".getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}
  • 這個邏輯處理器繼承自ChannelInboundHandlerAdapter ,然后覆蓋了channelActive()方法 , 這個方法會在客戶端連接建立成功之后被調(diào)用
  • 客戶端連接建立成功之后 , 調(diào)用channelActive() , 在這個方法里面 , 我們編寫向服務(wù)端寫數(shù)據(jù)的邏輯
  • 向服務(wù)端寫數(shù)據(jù)分為兩步 , 首先我們要獲取一個netty對二進制數(shù)據(jù)抽象的二進制ByteBuf , 上面代碼中ctx.alloc() 獲取一個ByteBuf的內(nèi)存管理器 , 這個內(nèi)存管理器的作用就是分配一個ByteBuf , 然后我們把字符串的二進制數(shù)據(jù)填充到ByteBuf , 這樣我們就獲取到了Netty需要的一個數(shù)據(jù)格式, 最后我們調(diào)用ctx.channel().writeAndFlush()把數(shù)據(jù)寫到服務(wù)端。
  • 以上就是 向服務(wù)端寫數(shù)據(jù)的邏輯 , 和傳統(tǒng)的socket 編程不同的是 , Netty 里面的數(shù)據(jù)是以ByteBuf為單位的 , 所有需要寫出的數(shù)據(jù)必須塞到一個ByteBuf里 , 需要讀取的數(shù)據(jù)也是如此。
  • 服務(wù)端讀取客戶端數(shù)據(jù)

  • 服務(wù)端的數(shù)據(jù)處理邏輯 是通過ServerBootStrap 的childHandler()方法指定

    serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// TODO Auto-generated method stub}})
  • 現(xiàn)在 , 我們在initChannel() 中 給服務(wù)端添加一個邏輯處理器 , 這個處理器 的作用就是負責客戶端讀數(shù)據(jù)

    serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}})
  • 這個方法里的邏輯和客戶端類似 , 獲取服務(wù)端關(guān)于這條連接的邏輯處理鏈pipeline , 然后添加一個邏輯處理器 , 負責讀取客戶端發(fā)來的數(shù)據(jù)

    class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));}}
  • 服務(wù)端的邏輯處理器同樣是繼承自ChannelInboundHandlerAdapter , 與客戶端不同的是 , 這里覆蓋的方法是 channelRead() ,這個方法在接收到數(shù)據(jù)之后會被回調(diào)
  • 這里的msg 值的是Netty里面數(shù)據(jù)讀寫的載體 , 為什么不直接是ByteBuf , 而需要我們強轉(zhuǎn)一下 , 我們后面會分析道 , 這里我們強轉(zhuǎn)之后 , 然后調(diào)用buteBuf.toString() 就能夠拿到我們客戶端發(fā)過來的字符串數(shù)據(jù)。
  • 運行測試

  • 完整代碼

    import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() + " 連接失敗,剩余重試次數(shù):" + maxRetry + "," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));}}
  • 運行結(jié)果:

  • 服務(wù)端回復數(shù)據(jù)給客戶端

  • 服務(wù)端向客戶端寫數(shù)據(jù)的邏輯與客戶端向服務(wù)端寫數(shù)據(jù)的邏輯一樣 , 先創(chuàng)建一個ByteBuf , 然后填充二進制數(shù)據(jù) , 最后調(diào)用writeAndFlush()方法寫出去

    class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}
  • 現(xiàn)在輪到客戶端了 , 客戶端讀取數(shù)據(jù)的邏輯和服務(wù)端讀數(shù)據(jù)的邏輯一樣 , 同樣是覆蓋channelRead() 方法

    class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數(shù)據(jù) ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}
  • 現(xiàn)在 客戶端和服務(wù)端就實現(xiàn)了雙向通信

  • 完整代碼:

    import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() + " 連接失敗,剩余重試次數(shù):" + maxRetry + "," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數(shù)據(jù) ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}
  • 執(zhí)行結(jié)果

  • 總結(jié)

  • 本小節(jié)中 , 我們了解到客戶端和服務(wù)端的邏輯處理均是在啟動的時候 , 通過給邏輯處理鏈pipeline添加邏輯處理器 , 來編寫數(shù)據(jù)的處理邏輯 , pipeline的邏輯我們會在后面分析
  • 接下來我們學到了 在客戶端連接成功之后會回調(diào)邏輯處理器的channelActive()方法 , 而不管是服務(wù)端還是客戶端 , 收到數(shù)據(jù)之后都會調(diào)用channelRead方法
  • 寫數(shù)據(jù)用writeAndFlush() 方法 客戶端與服務(wù)端交互的二進制數(shù)據(jù)載體為ByteBuf , ByteBuf 通過連接的內(nèi)存管理器創(chuàng)建 , 字節(jié)數(shù)據(jù)填充到ByteBuf 之后才能寫到對端 , 接下來一小節(jié) , 我們就重點來分析ByteBuf
  • 思考: 如何實現(xiàn)在新連接介入的時候 , 服務(wù)端主動向客戶端推送消息 , 客戶端回復服務(wù)端消息?

  • 解答: 在服務(wù)器端的邏輯處理其中也實現(xiàn) channelActive() 在有新的連接接入時 會回調(diào)此方法

    class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "是不是你連我了?";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}
  • 總結(jié)

    以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。