Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信
##
Netty實戰(zhàn) IM即時通訊系統(tǒng)(六)實戰(zhàn): 客戶端和服務(wù)端雙向通信零、 目錄
- Netty 簡介
- Netty 環(huán)境配置
- 服務(wù)端啟動流程
- 實戰(zhàn): 客戶端和服務(wù)端雙向通信
- 數(shù)據(jù)傳輸載體ByteBuf介紹
- 客戶端與服務(wù)端通信協(xié)議編解碼
- 實現(xiàn)客戶端登錄
- 實現(xiàn)客戶端與服務(wù)端收發(fā)消息
- pipeline與channelHandler
- 構(gòu)建客戶端與服務(wù)端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實現(xiàn)客戶端身份校驗
- 客戶端互聊原理與實現(xiàn)
- 群聊的發(fā)起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發(fā)及Netty性能優(yōu)化
- 心跳與空閑檢測
- 總結(jié)
- 擴展
###六、 實戰(zhàn): 客戶端和服務(wù)端雙向通信
本節(jié)我們要實現(xiàn)的功能是客戶端連接成功后,向服務(wù)端寫出一段數(shù)據(jù) , 服務(wù)端收到數(shù)據(jù)后打印 , 并向客戶端回復一段數(shù)據(jù) 。
我們先做一個代碼框架 , 然后在框架上面做修改
public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1" , 8000 ,5);}}class Test_07_Client{public static void start(String IP , int port ,int maxRetry){NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});connect(bootstrap , IP , port , maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry , int... retryIndex) {bootstrap.connect(IP , port).addListener(future ->{int[] finalRetryIndex;if(future.isSuccess()) {System.out.println("連接成功");}else if(maxRetry ==0) {System.out.println("達到最大重試此時,放棄重試");}else {// 初始化 重試計數(shù)if(retryIndex.length == 0) {finalRetryIndex = new int[]{0};}else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() +" 連接失敗,剩余重試次數(shù):"+ maxRetry + ","+delay+"秒后執(zhí)行重試");bootstrap.config().group().schedule(()->{connect(bootstrap , IP, port , maxRetry -1 , finalRetryIndex[0]+1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server{public static void start(int port){NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}客戶端發(fā)送數(shù)據(jù)到服務(wù)端
在《客戶端啟動流程》這一小節(jié) , 我們提到 客戶端相關(guān)的數(shù)據(jù)讀寫邏輯是通過BootStrap的handler()方法指定
bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});現(xiàn)在我們在initChannel()中給客戶端添加一個邏輯處理器 , 這個處理器的作用就是負責向服務(wù)端寫數(shù)據(jù)
bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});ch.pipeline()方法返回的是和這條連接相關(guān)的邏輯處理鏈 , 采用了責任鏈處理模式 , 這里不理解沒關(guān)系 , 后面會講到。
然后再調(diào)用addLast()方法添加一個邏輯處理器 , 這個邏輯處理器為的就是在客戶端建立連接成功之后向服務(wù)端寫數(shù)據(jù) , 下面是這個邏輯處理器的代碼:
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(new Date() + " 客戶端寫出數(shù)據(jù)...");// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = "你好,奧特曼!".getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}服務(wù)端讀取客戶端數(shù)據(jù)
服務(wù)端的數(shù)據(jù)處理邏輯 是通過ServerBootStrap 的childHandler()方法指定
serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// TODO Auto-generated method stub}})現(xiàn)在 , 我們在initChannel() 中 給服務(wù)端添加一個邏輯處理器 , 這個處理器 的作用就是負責客戶端讀數(shù)據(jù)
serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}})這個方法里的邏輯和客戶端類似 , 獲取服務(wù)端關(guān)于這條連接的邏輯處理鏈pipeline , 然后添加一個邏輯處理器 , 負責讀取客戶端發(fā)來的數(shù)據(jù)
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));}}運行測試
完整代碼
import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() + " 連接失敗,剩余重試次數(shù):" + maxRetry + "," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));}}運行結(jié)果:
服務(wù)端回復數(shù)據(jù)給客戶端
服務(wù)端向客戶端寫數(shù)據(jù)的邏輯與客戶端向服務(wù)端寫數(shù)據(jù)的邏輯一樣 , 先創(chuàng)建一個ByteBuf , 然后填充二進制數(shù)據(jù) , 最后調(diào)用writeAndFlush()方法寫出去
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}現(xiàn)在輪到客戶端了 , 客戶端讀取數(shù)據(jù)的邏輯和服務(wù)端讀數(shù)據(jù)的邏輯一樣 , 同樣是覆蓋channelRead() 方法
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數(shù)據(jù) ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}現(xiàn)在 客戶端和服務(wù)端就實現(xiàn)了雙向通信
完整代碼:
import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務(wù)端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業(yè)務(wù)處理邏輯 可以添加自定義的業(yè)務(wù)處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執(zhí)行重試System.out.println(new Date() + " 連接失敗,剩余重試次數(shù):" + maxRetry + "," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務(wù)端:端口【"+port+"】綁定成功!");}else {System.out.println("服務(wù)端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數(shù)據(jù) ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數(shù)據(jù):"+content);// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}執(zhí)行結(jié)果
總結(jié)
思考: 如何實現(xiàn)在新連接介入的時候 , 服務(wù)端主動向客戶端推送消息 , 客戶端回復服務(wù)端消息?
解答: 在服務(wù)器端的邏輯處理其中也實現(xiàn) channelActive() 在有新的連接接入時 會回調(diào)此方法
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "是不是你連我了?";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù)->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數(shù)據(jù)String content = "你好,田先森!";System.out.println(new Date() +":服務(wù)端寫出數(shù)據(jù)-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數(shù)據(jù)byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數(shù)據(jù)填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}總結(jié)
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty实战 IM即时通讯系统(五)客
- 下一篇: Netty实战 IM即时通讯系统(七)数