日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Netty实战 IM即时通讯系统(九)实现客户端登录

發布時間:2024/4/30 windows 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实战 IM即时通讯系统(九)实现客户端登录 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

##

Netty實戰 IM即時通訊系統(九)實現客戶端登錄

零、 目錄

  • IM系統簡介
    • Netty 簡介
    • Netty 環境配置
    • 服務端啟動流程
    • 客戶端啟動流程
    • 實戰: 客戶端和服務端雙向通信
    • 數據傳輸載體ByteBuf介紹
    • 客戶端與服務端通信協議編解碼
    • 實現客戶端登錄
    • 實現客戶端與服務端收發消息
    • pipeline與channelHandler
    • 構建客戶端與服務端pipeline
    • 拆包粘包理論與解決方案
    • channelHandler的生命周期
    • 使用channelHandler的熱插拔實現客戶端身份校驗
    • 客戶端互聊原理與實現
    • 群聊的發起與通知
    • 群聊的成員管理(加入與退出,獲取成員列表)
    • 群聊消息的收發及Netty性能優化
    • 心跳與空閑檢測
    • 總結
    • 擴展

    一、 登錄流程

  • 從上圖中我們可以看到 , 客戶端連接上服務端之后
  • 客戶端會構建一個登錄請求對象 , 然后通過編碼把請求對象編碼為ByteBuf , 寫到服務端
  • 服務端接收到ByteBuf之后 , 首先通過解碼把ByteBuf 解碼為登錄請求響應 , 然后進行校驗
  • 服務端校驗通過之后 , 構造一個登錄響應對象 , 依然經過編碼 , 然后回寫到客戶端
  • 客戶端收到服務端響應之后解碼ByteBuf , 能拿到登錄響應之后 , 判斷是否登錄成功
  • 二、 代碼框架

    /*** 實現客戶端登錄* * @author outman*/public class Test_10_實現客戶端登錄 {public static void main(String[] args) {// 啟動服務端Test_10_server.start(8000);// 啟動客戶端Test_10_client.start("127.0.0.1", 8000, 5);}}/*** 客戶端* * @author outman*/class Test_10_client {/*** 客戶端啟動* * @param ip* 連接ip* @param port* 服務端端口* @param maxRetry* 最大重試次數*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯}});// 連接服務端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計數*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達到重連最大次數放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 服務端* * @author outman*/class Test_10_server {/*** @desc 服務端啟動* @param port*/public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務端處理邏輯}});// 綁定端口bind(serverBootstrap, port);}/*** @desc 自動綁定遞增并啟動服務端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】失敗,執行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 客戶端處理邏輯* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {}}/*** 服務端處理邏輯* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {}}/*** 數據包抽象類* * @author outman*/@Dataabstract class Test_10_Packet {// 協議版本號private byte version = 1;// 獲取指定標識public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_10_Serializer {// 獲取序列化算法標識byte getSerializerAlgorithm();// 序列化算法標識集合interface SerializerAlgorithm {// JSON 序列化算法標識public static final byte JSONSerializerAlgrothm = 1;}// 默認的序列化算法public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);// 反序列化<T>T deSerialize(byte[] bs, Class<T> clazz);}/*** 數據包編解碼類* * @author outman*/class Test_10_PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();// 注冊 序列化類private Class[] serializerArray = new Class[] { Test_10_JSONSerializer.class };// 注冊抽象數據包類private Class[] packetArray = new Class[] { Test_10_LoginRequestPacket.class, Test_10_LoginResponsePacket.class };// 序列化算法標識 和對應的序列化類映射private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;// 指令標識和對應的數據包抽象類映射private static Map<Byte, Class<? super Test_10_Packet>> packetMap;// 初始化 兩個映射private Test_10_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_10_Serializer)clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_10_Packet)clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {// 序列化數據包byte[] bs = Test_10_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數byteBuf.writeInt(MAGIC_NUMBER);// 寫入協議版本號byteBuf.writeByte(packet.getVersion());// 寫入指令標識byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標識byteBuf.writeByte(Test_10_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數據長度byteBuf.writeInt(bs.length);// 寫入數據byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過魔數校驗byteBuf.skipBytes(4);// 跳過版本號校驗byteBuf.skipBytes(1);// 獲取指令標識byte command = byteBuf.readByte();// 獲取序列化算法標識byte serializerAlgorthm = byteBuf.readByte();// 獲取數據長度int len = byteBuf.readInt();// 獲取數據byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對應的序列化算法類Test_10_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對應的數據包類Test_10_Packet packet = getPacket(command);if(serializer != null && packet != null) {//反序列化數據包return serializer.deSerialize(bs, packet.getClass());}else {throw new RuntimeException("沒有找到對應的序列化實現或數據包實現");}}private static Test_10_Packet getPacket(byte command) throws Exception {return (Test_10_Packet) packetMap.get(command).newInstance();}private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_10_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請求數據包實體類* * @author outman*/@Dataclass Test_10_LoginRequestPacket extends Test_10_Packet {private int userId ;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應數據包實體類* * @author outman*/@Dataclass Test_10_LoginResponsePacket extends Test_10_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應碼集合* */interface Code{// 成功的響應碼public static final int SUCCESS= 10000;// 失敗的響應碼public static final int FAIL = 10001;}}/*** Json序列化實現類* * @author outman*/class Test_10_JSONSerializer implements Test_10_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T>T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}
  • 這個代碼框架中 已經寫好了 服務端、 客戶端啟動連接 。 通信協議 、 數據包編解碼的邏輯 , 剩下的客戶端、服務端業務處理邏輯 , 我們邊學邊寫, 現在你可以把代碼框架粘貼到你的編輯器中
  • 二、 邏輯處理器

  • 接下來我們分別實現一下上述四個過程 , 開始之前 , 我們回顧一下客戶端與服務端的啟動流程 , 客戶端啟動的時候 , 我們會在引導類BootStrap里配置客戶端處理邏輯 , 本小節中我們的客戶端業務處理邏輯叫做Test_10_clientHandler

    /*** 客戶端處理邏輯* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}}
  • 我們在客戶端啟動的時候 , 給客戶端引導類配置這個邏輯處理器 , 這樣Netty中事件相關的回調就會回調我們的Test_10_clientHandler

    bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_10_clientHandler());}});
  • 同樣 我們給服務端引導類頁配置一個邏輯處理器

    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務端處理邏輯ch.pipeline().addLast(new Test_10_serverHandler());}});
  • 接下來我們主要圍繞這兩個Handler來編寫客戶端登錄相關的處理邏輯

  • 三、 客戶端發送登錄請求

  • 客戶端處理登錄請求

  • 我們實現在客戶端連接上服務端之后 , 立即登錄。 在客戶端和服務端連接成功時 , Netty 會回調Test_10_clientHandler 的channelActive(ChannelHandlerContext ctx) 方法 , 我們在這里寫 請求登錄的邏輯(我們事先在 Test_10_LoginRequestPacket 中添加了三個屬性 , Test_10_LoginRequestPacket 類上的@Data 注解是lombok 提供的 ,讓我們不用寫setter/getter)

    /*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:"+new Date()+"開始登陸");// 創建登陸對象Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();// 隨機取ID 1~999 loginRequestPacket.setUserId((int)(Math.random()*1000)+1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_10_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);}
  • 寫數據的時候 , 我們通過ctx.channel() 獲取到當前連接(Netty對連接 的抽象為channel , 后面小節會分析) , 然后調用了writeAndFlush() 方法 就能把二進制數據寫到服務端

  • 服務端處理登錄請求

    /*** 服務端處理邏輯* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 校驗成功System.out.println("服務端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陸成功");break;default:System.out.println("服務端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}
  • 我們在服務端引導類 ServerBootstrap 添加了邏輯處理器Test_10_serverHandler 之后 , Netty 在收到數據之后會回調channelRead() , 這里第二個參數msg , 在我們這個場景中 , 可以直接強轉為ByteBuf , 為什么Netty不直接把這個參數類型定義為ByteBuf? , 我們在后面的小節會分析到
  • 拿到ByteBuf 之后 , 首先要做的事情就是解碼 , 解碼出的java數據包對象 , 然后判斷如果是登陸請求數據包, 就進行登錄邏輯的處理這里我們假設所有的登錄請求都是成功的 , 接下來, 我們來告訴客戶端他登陸成功的好消息。
  • 四、 服務端發送登錄響應

  • 服務端發送登錄響應

    /*** 服務端處理邏輯* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 模擬校驗成功System.out.println("服務端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陸成功");// 給服務端響應Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_10_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);//寫出數據ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}
  • 這段邏輯仍然時候服務端邏輯處理器Test_10_serverHandler的channelRead 方法中 , 我們構造一個登錄響應包Test_10_LoginResponsePacket , 然后在校驗成功和失敗時分別設置標志位 , 接下來調用編碼器把java對象編碼成ByteBuf , 然后調用writeAndFlush 把數據包寫給客戶端
  • 客戶端處理登錄響應

    /*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_10_Packet packet= Test_10_PacketCodec.INSTANCE.deCode(byteBuf);//根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_10_Packet.Command.LOGIN_RESPONSE:Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;System.out.println("客戶端:"+new Date() +"收到服務端響應【"+loginResponsePacket.getMsg()+"】");break;default:break;}}
  • 客戶端拿到數據之后 , 調用Test_10_PacketCodec 進行解碼操作 , 然后我們打印出服務端的響應內容
  • 執行結果

  • 完整代碼

    /*** 實現客戶端登錄* * @author outman*/public class Test_10_實現客戶端登錄 {public static void main(String[] args) {// 啟動服務端Test_10_server.start(8000);// 啟動客戶端Test_10_client.start("127.0.0.1", 8000, 5);}}/*** 客戶端* * @author outman*/class Test_10_client {/*** 客戶端啟動* * @param ip* 連接ip* @param port* 服務端端口* @param maxRetry* 最大重試次數*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_10_clientHandler());}});// 連接服務端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計數*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達到重連最大次數放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 服務端* * @author outman*/class Test_10_server {/*** @desc 服務端啟動* @param port*/public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務端處理邏輯ch.pipeline().addLast(new Test_10_serverHandler());}});// 綁定端口bind(serverBootstrap, port);}/*** @desc 自動綁定遞增并啟動服務端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】失敗,執行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 客戶端處理邏輯* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:"+new Date()+"開始登陸");// 創建登陸對象Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();// 隨機取ID 1~999 loginRequestPacket.setUserId((int)(Math.random()*1000)+1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_10_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_10_Packet packet= Test_10_PacketCodec.INSTANCE.deCode(byteBuf);//根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_10_Packet.Command.LOGIN_RESPONSE:Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;System.out.println("客戶端:"+new Date() +"收到服務端響應【"+loginResponsePacket.getMsg()+"】");break;default:break;}}}/*** 服務端處理邏輯* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 模擬校驗成功System.out.println("服務端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陸成功");// 給服務端響應Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_10_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);//寫出數據ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}/*** 數據包抽象類* * @author outman*/@Dataabstract class Test_10_Packet {// 協議版本號private byte version = 1;// 獲取指定標識public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_10_Serializer {// 獲取序列化算法標識byte getSerializerAlgorithm();// 序列化算法標識集合interface SerializerAlgorithm {// JSON 序列化算法標識public static final byte JSONSerializerAlgrothm = 1;}// 默認的序列化算法public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);// 反序列化<T>T deSerialize(byte[] bs, Class<T> clazz);}/*** 數據包編解碼類* * @author outman*/class Test_10_PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();// 注冊 序列化類private Class[] serializerArray = new Class[] { Test_10_JSONSerializer.class };// 注冊抽象數據包類private Class[] packetArray = new Class[] { Test_10_LoginRequestPacket.class, Test_10_LoginResponsePacket.class };// 序列化算法標識 和對應的序列化類映射private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;// 指令標識和對應的數據包抽象類映射private static Map<Byte, Class<? super Test_10_Packet>> packetMap;// 初始化 兩個映射private Test_10_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_10_Serializer)clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_10_Packet)clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {// 序列化數據包byte[] bs = Test_10_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數byteBuf.writeInt(MAGIC_NUMBER);// 寫入協議版本號byteBuf.writeByte(packet.getVersion());// 寫入指令標識byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標識byteBuf.writeByte(Test_10_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數據長度byteBuf.writeInt(bs.length);// 寫入數據byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過魔數校驗byteBuf.skipBytes(4);// 跳過版本號校驗byteBuf.skipBytes(1);// 獲取指令標識byte command = byteBuf.readByte();// 獲取序列化算法標識byte serializerAlgorthm = byteBuf.readByte();// 獲取數據長度int len = byteBuf.readInt();// 獲取數據byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對應的序列化算法類Test_10_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對應的數據包類Test_10_Packet packet = getPacket(command);if(serializer != null && packet != null) {//反序列化數據包return serializer.deSerialize(bs, packet.getClass());}else {throw new RuntimeException("沒有找到對應的序列化實現或數據包實現");}}private static Test_10_Packet getPacket(byte command) throws Exception {return (Test_10_Packet) packetMap.get(command).newInstance();}private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_10_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請求數據包實體類* * @author outman*/@Dataclass Test_10_LoginRequestPacket extends Test_10_Packet {private int userId ;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應數據包實體類* * @author outman*/@Dataclass Test_10_LoginResponsePacket extends Test_10_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應碼集合* */interface Code{// 成功的響應碼public static final int SUCCESS= 10000;// 失敗的響應碼public static final int FAIL = 10001;}}/*** Json序列化實現類* * @author outman*/class Test_10_JSONSerializer implements Test_10_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T>T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}
  • 五、 總結

  • 本小節我們梳理了客戶端登錄的基本流程 , 然后結合上一小節的編解碼邏輯 , 我們使用Netty 完成了完整的客戶端登錄流程。
  • 六、 思考

  • 客戶端登錄成功或失敗之后 , 如何把成功或者失敗的標識綁定在客戶端的連接上 ? 服務端又是怎樣有效的避免客戶端重新登錄的?
  • 答: 給channel設置attr自定義屬性 , 可以把登錄標識綁定在連接上
  • 客戶端NioEventLoopGroup不用釋放嗎?
  • 答: 不用 , 程序關閉之后 , 所有的線程都自動關閉了
  • 與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(九)实现客户端登录的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。