Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息
Netty實(shí)戰(zhàn) IM即時(shí)通訊系統(tǒng)(十)實(shí)現(xiàn)客戶端和服務(wù)端收發(fā)消息
零、 目錄
- Netty 簡(jiǎn)介
- Netty 環(huán)境配置
- 服務(wù)端啟動(dòng)流程
- 客戶端啟動(dòng)流程
- 實(shí)戰(zhàn): 客戶端和服務(wù)端雙向通信
- 數(shù)據(jù)傳輸載體ByteBuf介紹
- 客戶端與服務(wù)端通信協(xié)議編解碼
- 實(shí)現(xiàn)客戶端登錄
- 實(shí)現(xiàn)客戶端與服務(wù)端收發(fā)消息
- pipeline與channelHandler
- 構(gòu)建客戶端與服務(wù)端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實(shí)現(xiàn)客戶端身份校驗(yàn)
- 客戶端互聊原理與實(shí)現(xiàn)
- 群聊的發(fā)起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發(fā)及Netty性能優(yōu)化
- 心跳與空閑檢測(cè)
- 總結(jié)
- 擴(kuò)展
一、 實(shí)現(xiàn)需求
二、 代碼框架
在代碼框架中我們已經(jīng)實(shí)現(xiàn)了 服務(wù)端啟動(dòng) 、 客戶端啟動(dòng) 、 客戶端與服務(wù)端雙向通信 、 客戶端與服務(wù)端通信協(xié)議編解碼 、 客戶端登錄的邏輯 , 接下來(lái)你可以把代碼框架粘貼到你的編輯器中跟我來(lái)一起實(shí)現(xiàn)客戶端與服務(wù)端收發(fā)消息
import java.lang.reflect.Method;import java.util.Arrays;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;import com.alibaba.fastjson.JSONObject;import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.Data;/*** 2019年1月3日* * @author outman* * 實(shí)現(xiàn)客戶端和服務(wù)端收發(fā)消息**/public class Test_11_實(shí)現(xiàn)客戶端與服務(wù)端收發(fā)消息 {public static void main(String[] args) {// 啟動(dòng)服務(wù)端Test_11_server.start(8000);// 啟動(dòng)客戶端Test_11_client.start("127.0.0.1", 8000, 5);}}/*** 2019年1月3日* * @author outman** 服務(wù)端*/class Test_11_server {/*** @desc 服務(wù)端啟動(dòng)* @param port*/public static void start(int port) {NioEventLoopGroup bossgroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務(wù)端處理邏輯ch.pipeline().addLast(new Test_11_serverHandler());}});bind(serverBootstrap, port);}/*** @desc 自動(dòng)綁定遞增并啟動(dòng)服務(wù)端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務(wù)端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務(wù)端:" + new Date() + "綁定端口【" + port + "】失敗,執(zhí)行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 2019年1月3日* * @author outman** 客戶端*/class Test_11_client {/*** 客戶端啟動(dòng)* * @param ip* 連接ip* @param port* 服務(wù)端端口* @param maxRetry* 最大重試次數(shù)*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_11_clientHandler());}});// 連接服務(wù)端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務(wù)端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計(jì)數(shù)*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計(jì)數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態(tài)if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達(dá)到重連最大次數(shù)放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 客戶端處理邏輯* * @author outman*/class Test_11_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時(shí)觸發(fā)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:" + new Date() + "開始登陸");// 創(chuàng)建登陸對(duì)象Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket();// 隨機(jī)取ID 1~999loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數(shù)據(jù)ctx.channel().writeAndFlush(byteBuf);}/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數(shù)據(jù)包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)不同的指令選擇對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務(wù)端響應(yīng)【" + loginResponsePacket.getMsg() + "】");break;default:break;}}}/*** 服務(wù)端處理邏輯* * @author outman*/class Test_11_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時(shí)觸發(fā)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)指令執(zhí)行對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗(yàn)成功System.out.println("服務(wù)端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給服務(wù)端響應(yīng)Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數(shù)據(jù)ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務(wù)端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}}/*** 數(shù)據(jù)包抽象類* * @author outman*/@Dataabstract class Test_11_Packet {// 協(xié)議版本號(hào)private byte version = 1;// 獲取指定標(biāo)識(shí)public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應(yīng)指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_11_Serializer {// 獲取序列化算法標(biāo)識(shí)byte getSerializerAlgorithm();// 序列化算法標(biāo)識(shí)集合interface SerializerAlgorithm {// JSON 序列化算法標(biāo)識(shí)public static final byte JSONSerializerAlgrothm = 1;}// 默認(rèn)的序列化算法public Test_11_Serializer DEFAULT = new Test_11_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet);// 反序列化<T> T deSerialize(byte[] bs, Class<T> clazz);}/*** 數(shù)據(jù)包編解碼類* * @author outman*/class Test_11_PacketCodec {// 魔數(shù)private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_11_PacketCodec INSTANCE = new Test_11_PacketCodec();// 注冊(cè) 序列化類private Class[] serializerArray = new Class[] { Test_11_JSONSerializer.class };// 注冊(cè)抽象數(shù)據(jù)包類private Class[] packetArray = new Class[] { Test_11_LoginRequestPacket.class, Test_11_LoginResponsePacket.class };// 序列化算法標(biāo)識(shí) 和對(duì)應(yīng)的序列化類映射private static Map<Byte, Class<? super Test_11_Serializer>> serializerMap;// 指令標(biāo)識(shí)和對(duì)應(yīng)的數(shù)據(jù)包抽象類映射private static Map<Byte, Class<? super Test_11_Packet>> packetMap;// 初始化 兩個(gè)映射private Test_11_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_11_Serializer) clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_11_Packet) clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_11_Packet packet) {// 序列化數(shù)據(jù)包byte[] bs = Test_11_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數(shù)byteBuf.writeInt(MAGIC_NUMBER);// 寫入?yún)f(xié)議版本號(hào)byteBuf.writeByte(packet.getVersion());// 寫入指令標(biāo)識(shí)byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標(biāo)識(shí)byteBuf.writeByte(Test_11_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數(shù)據(jù)長(zhǎng)度byteBuf.writeInt(bs.length);// 寫入數(shù)據(jù)byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_11_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過(guò)魔數(shù)校驗(yàn)byteBuf.skipBytes(4);// 跳過(guò)版本號(hào)校驗(yàn)byteBuf.skipBytes(1);// 獲取指令標(biāo)識(shí)byte command = byteBuf.readByte();// 獲取序列化算法標(biāo)識(shí)byte serializerAlgorthm = byteBuf.readByte();// 獲取數(shù)據(jù)長(zhǎng)度int len = byteBuf.readInt();// 獲取數(shù)據(jù)byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對(duì)應(yīng)的序列化算法類Test_11_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對(duì)應(yīng)的數(shù)據(jù)包類Test_11_Packet packet = getPacket(command);if (serializer != null && packet != null) {// 反序列化數(shù)據(jù)包return serializer.deSerialize(bs, packet.getClass());} else {throw new RuntimeException("沒有找到對(duì)應(yīng)的序列化實(shí)現(xiàn)或數(shù)據(jù)包實(shí)現(xiàn)");}}private static Test_11_Packet getPacket(byte command) throws Exception {if(packetMap.get(command) == null) {throw new RuntimeException("未注冊(cè)的數(shù)據(jù)包類型");}return (Test_11_Packet) packetMap.get(command).newInstance();}private static Test_11_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_11_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請(qǐng)求數(shù)據(jù)包實(shí)體類* * @author outman*/@Dataclass Test_11_LoginRequestPacket extends Test_11_Packet {private int userId;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應(yīng)數(shù)據(jù)包實(shí)體類* * @author outman*/@Dataclass Test_11_LoginResponsePacket extends Test_11_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應(yīng)碼集合*/interface Code {// 成功的響應(yīng)碼public static final int SUCCESS = 10000;// 失敗的響應(yīng)碼public static final int FAIL = 10001;}}/*** Json序列化實(shí)現(xiàn)類* * @author outman*/class Test_11_JSONSerializer implements Test_11_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T> T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}三、 收發(fā)消息對(duì)象
首先 ,Test_11_packet的指令集合中添加 發(fā)送消息指令為3
/*** 數(shù)據(jù)包抽象類* * @author outman*/@Dataabstract class Test_11_Packet {// 協(xié)議版本號(hào)private byte version = 1;// 獲取指定標(biāo)識(shí)public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應(yīng)指令public static final byte LOGIN_RESPONSE = 2;// 發(fā)送消息指令public static final byte MESSAGE_REQUEST = 3;// 回復(fù)消息指令public static final byte MESSAGE_RESPONSE = 4;}}我們來(lái)定義一下客戶端與服務(wù)端收發(fā)消息對(duì)象 , 我們把客戶端發(fā)送至服務(wù)端的消息對(duì)象定義為Test_11_MessageRequestPacket
/*** 2019年1月3日* @author outman** 發(fā)送消息對(duì)象*/@Dataclass Test_11_MessageRequestPacket extends Test_11_Packet{private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_REQUEST;}}我們把服務(wù)端發(fā)送至客戶端的消息對(duì)象定義為 Test_11_messageResponsePacket
/*** 2019年1月3日* @author outman* 回復(fù)消息對(duì)象*/@Dataclass Test_11_MessageResponsePacket extends Test_11_Packet{private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_RESPONSE;}}四、 判斷登錄是否成功
在前面一小節(jié) , 我們?cè)谖哪┏隽艘坏浪伎碱}: 如何判斷客戶端是否已經(jīng)登錄?
在客戶端啟動(dòng)流程這一章節(jié) , 我們有提到可以給客戶端連接也就是channel 綁定屬性 , 通過(guò)channel.attr(XXX).set(xxx)的方式 , 那么我們是否可以在登錄成功之后 , 給channel綁定一個(gè)登錄成功的標(biāo)志 , 然后在判斷是否登錄成功的時(shí)候取出這個(gè)標(biāo)志? 答案十肯定的。
我們來(lái)定義一下登錄成功的標(biāo)志
/*** 2019年1月21日* @author outman* * 連接 屬性**/interface Test_11_ChannelAttributes {// 連接登錄標(biāo)識(shí)屬性AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");}然后我們?cè)诘卿洺晒χ蠼o連接綁定登錄成功標(biāo)識(shí)
Test_11_clientHandler.java
/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數(shù)據(jù)包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)不同的指令選擇對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務(wù)端響應(yīng)【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標(biāo)識(shí)if(Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println(new Date() + "登錄成功");}else {// 登錄失敗System.out.println(new Date() + "登錄失敗,原因-->"+loginResponsePacket.getMsg());}break;default:break;}}登錄相關(guān)工具類
/*** 2019年1月21日* @author outman* * 登錄相關(guān)工具類**/class Test_11_LoginUtil{/*** @desc 判斷登錄成功* @param loginResponsePacket* @return 是否登錄成功*/public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) {boolean flag = false;if(loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) {flag = true;}return flag;}/*** @desc 標(biāo)識(shí)連接登錄成功* @param channel* @return*/public static void markAsLogin(Channel channel) {channel.attr(Test_11_ChannelAttributes.LOGIN).set(true);}/*** @desc 判斷是否登錄* @param channel* @return*/public static boolean hasLogin(Channel channel) {boolean flag = false;Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get();if(attr == null) return flag;return attr;}}如以上代碼所示 , 我們出去出LoginUtils用于設(shè)置登錄成功標(biāo)志位已經(jīng)判斷是否有標(biāo)志位
五、 控制臺(tái)輸入消息并發(fā)送
在客戶端啟動(dòng)這小節(jié)中 , 我們已經(jīng)學(xué)到了客戶端啟動(dòng)流程 , 現(xiàn)在 , 我們?cè)诳蛻舳诉B接上服務(wù)端之后啟動(dòng)控制臺(tái)線程 , 從控制臺(tái)讀取消息然后發(fā)送到服務(wù)端。
Test_11_client.java /*** @desc 連接服務(wù)端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計(jì)數(shù)*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計(jì)數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態(tài)if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");// 啟動(dòng)控制臺(tái)線程Channel channel = ((ChannelFuture) future).channel();startConsoleThread(channel);} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達(dá)到重連最大次數(shù)放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}/*** @desc 啟動(dòng)控制臺(tái)線程* @param channel*/private static void startConsoleThread(Channel channel) {System.out.println("客戶端:啟動(dòng)控制臺(tái)線程");new Thread(() -> {while (Thread.interrupted()) {if (Test_11_LoginUtil.hasLogin(channel)) {System.out.println("輸入消息發(fā)送至服務(wù)端");Scanner sc = new Scanner(System.in);String msg = sc.nextLine();Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket();messageRequestPacket.setMessage(msg);ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(),messageRequestPacket);channel.writeAndFlush(byteBuf);} else {System.out.println("您還未登錄,請(qǐng)登錄...");}}}).start();}六、 服務(wù)端收到消息處理
Test_11_serverHandler.java/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)指令執(zhí)行對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗(yàn)成功System.out.println("服務(wù)端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給客戶端響應(yīng)Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數(shù)據(jù)ctx.channel().writeAndFlush(byteBuf);break;case Test_11_Packet.Command.MESSAGE_REQUEST :// 處理消息Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet;System.out.println("服務(wù)端:"+ new Date() + "收到客戶端消息 --> "+ messageRequestPacket.getMessage());Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket();String msg = messageRequestPacket.getMessage();msg = msg.replace("?", "!");msg = msg.replace("?", "!");messageResponsePacket.setMessage("服務(wù)端回復(fù):【"+msg+"】");Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket);ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務(wù)端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}七、 客戶端消息處理
Test_11_clientHandler.java@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數(shù)據(jù)包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)不同的指令選擇對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務(wù)端響應(yīng)【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標(biāo)識(shí)if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println("客戶端:" + new Date() + "登錄成功");} else {// 登錄失敗System.out.println("客戶端:" + new Date() + "登錄失敗,原因-->" + loginResponsePacket.getMsg());}break;case Test_11_Packet.Command.MESSAGE_RESPONSE :Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet;System.out.println("客戶端:"+new Date()+ "收到服務(wù)端消息 --> "+ messageResponsePacket.getMessage());break;default:break;}}八、 執(zhí)行結(jié)果
九、 完整代碼
import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Scanner; import java.util.concurrent.TimeUnit;import com.alibaba.fastjson.JSONObject; import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code;import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import lombok.Data;/*** 2019年1月3日* * @author outman* * 實(shí)現(xiàn)客戶端和服務(wù)端收發(fā)消息**/ public class Test_11_實(shí)現(xiàn)客戶端與服務(wù)端收發(fā)消息 {public static void main(String[] args) {// 啟動(dòng)服務(wù)端Test_11_server.start(8000);// 啟動(dòng)客戶端Test_11_client.start("127.0.0.1", 8000, 5);}}/*** 2019年1月3日* * @author outman** 服務(wù)端*/ class Test_11_server {/*** @desc 服務(wù)端啟動(dòng)* @param port*/public static void start(int port) {NioEventLoopGroup bossgroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務(wù)端處理邏輯ch.pipeline().addLast(new Test_11_serverHandler());}});bind(serverBootstrap, port);}/*** @desc 自動(dòng)綁定遞增并啟動(dòng)服務(wù)端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務(wù)端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務(wù)端:" + new Date() + "綁定端口【" + port + "】失敗,執(zhí)行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 2019年1月3日* * @author outman** 客戶端*/ class Test_11_client {/*** 客戶端啟動(dòng)* * @param ip* 連接ip* @param port* 服務(wù)端端口* @param maxRetry* 最大重試次數(shù)*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_11_clientHandler());}});// 連接服務(wù)端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務(wù)端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計(jì)數(shù)*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計(jì)數(shù)if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態(tài)if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");// 啟動(dòng)控制臺(tái)線程Channel channel = ((ChannelFuture) future).channel();startConsoleThread(channel);} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達(dá)到重連最大次數(shù)放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執(zhí)行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}/*** @desc 啟動(dòng)控制臺(tái)線程* @param channel*/private static void startConsoleThread(Channel channel) {System.out.println("客戶端:啟動(dòng)控制臺(tái)線程");new Thread(() -> {while (!Thread.interrupted()) {if (Test_11_LoginUtil.hasLogin(channel)) {System.out.println("輸入消息發(fā)送至服務(wù)端");Scanner sc = new Scanner(System.in);String msg = sc.nextLine();Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket();messageRequestPacket.setMessage(msg);ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(),messageRequestPacket);channel.writeAndFlush(byteBuf);} else {System.out.println("您還未登錄,請(qǐng)登錄...");}}}).start();} }/*** 客戶端處理邏輯* * @author outman*/ class Test_11_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時(shí)觸發(fā)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:" + new Date() + "開始登陸");// 創(chuàng)建登陸對(duì)象Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket();// 隨機(jī)取ID 1~999loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數(shù)據(jù)ctx.channel().writeAndFlush(byteBuf);}/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數(shù)據(jù)包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)不同的指令選擇對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務(wù)端響應(yīng)【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標(biāo)識(shí)if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println("客戶端:" + new Date() + "登錄成功");} else {// 登錄失敗System.out.println("客戶端:" + new Date() + "登錄失敗,原因-->" + loginResponsePacket.getMsg());}break;case Test_11_Packet.Command.MESSAGE_RESPONSE :Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet;System.out.println("客戶端:"+new Date()+ "收到服務(wù)端消息 --> "+ messageResponsePacket.getMessage());break;default:break;}}}/*** 服務(wù)端處理邏輯* * @author outman*/ class Test_11_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時(shí)觸發(fā)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數(shù)據(jù)可讀時(shí)觸發(fā)*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據(jù)指令執(zhí)行對(duì)應(yīng)的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗(yàn)成功System.out.println("服務(wù)端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給客戶端響應(yīng)Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數(shù)據(jù)ctx.channel().writeAndFlush(byteBuf);break;case Test_11_Packet.Command.MESSAGE_REQUEST :// 處理消息Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet;System.out.println("服務(wù)端:"+ new Date() + "收到客戶端消息 --> "+ messageRequestPacket.getMessage());Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket();String msg = messageRequestPacket.getMessage();msg = msg.replace("?", "!");msg = msg.replace("?", "!");messageResponsePacket.setMessage("服務(wù)端回復(fù):【"+msg+"】");Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket);ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務(wù)端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}}/*** 數(shù)據(jù)包抽象類* * @author outman*/ @Data abstract class Test_11_Packet {// 協(xié)議版本號(hào)private byte version = 1;// 獲取指定標(biāo)識(shí)public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應(yīng)指令public static final byte LOGIN_RESPONSE = 2;// 發(fā)送消息指令public static final byte MESSAGE_REQUEST = 3;// 回復(fù)消息指令public static final byte MESSAGE_RESPONSE = 4;} }/*** 序列化抽象接口* * @author outman*/ interface Test_11_Serializer {// 獲取序列化算法標(biāo)識(shí)byte getSerializerAlgorithm();// 序列化算法標(biāo)識(shí)集合interface SerializerAlgorithm {// JSON 序列化算法標(biāo)識(shí)public static final byte JSONSerializerAlgrothm = 1;}// 默認(rèn)的序列化算法public Test_11_Serializer DEFAULT = new Test_11_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet);// 反序列化<T> T deSerialize(byte[] bs, Class<T> clazz);}/*** 數(shù)據(jù)包編解碼類* * @author outman*/ class Test_11_PacketCodec {// 魔數(shù)private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_11_PacketCodec INSTANCE = new Test_11_PacketCodec();// 注冊(cè) 序列化類private Class[] serializerArray = new Class[] { Test_11_JSONSerializer.class };// 注冊(cè)抽象數(shù)據(jù)包類private Class[] packetArray = new Class[] { Test_11_LoginRequestPacket.class, Test_11_LoginResponsePacket.class ,Test_11_MessageRequestPacket.class ,Test_11_MessageResponsePacket.class};// 序列化算法標(biāo)識(shí) 和對(duì)應(yīng)的序列化類映射private static Map<Byte, Class<? super Test_11_Serializer>> serializerMap;// 指令標(biāo)識(shí)和對(duì)應(yīng)的數(shù)據(jù)包抽象類映射private static Map<Byte, Class<? super Test_11_Packet>> packetMap;// 初始化 兩個(gè)映射private Test_11_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_11_Serializer) clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_11_Packet) clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_11_Packet packet) {// 序列化數(shù)據(jù)包byte[] bs = Test_11_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數(shù)byteBuf.writeInt(MAGIC_NUMBER);// 寫入?yún)f(xié)議版本號(hào)byteBuf.writeByte(packet.getVersion());// 寫入指令標(biāo)識(shí)byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標(biāo)識(shí)byteBuf.writeByte(Test_11_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數(shù)據(jù)長(zhǎng)度byteBuf.writeInt(bs.length);// 寫入數(shù)據(jù)byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_11_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過(guò)魔數(shù)校驗(yàn)byteBuf.skipBytes(4);// 跳過(guò)版本號(hào)校驗(yàn)byteBuf.skipBytes(1);// 獲取指令標(biāo)識(shí)byte command = byteBuf.readByte();// 獲取序列化算法標(biāo)識(shí)byte serializerAlgorthm = byteBuf.readByte();// 獲取數(shù)據(jù)長(zhǎng)度int len = byteBuf.readInt();// 獲取數(shù)據(jù)byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對(duì)應(yīng)的序列化算法類Test_11_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對(duì)應(yīng)的數(shù)據(jù)包類Test_11_Packet packet = getPacket(command);if (serializer != null && packet != null) {// 反序列化數(shù)據(jù)包return serializer.deSerialize(bs, packet.getClass());} else {throw new RuntimeException("沒有找到對(duì)應(yīng)的序列化實(shí)現(xiàn)或數(shù)據(jù)包實(shí)現(xiàn)");}}private static Test_11_Packet getPacket(byte command) throws Exception {if(packetMap.get(command) == null) {throw new RuntimeException("未注冊(cè)的數(shù)據(jù)包類型-->"+ command);}return (Test_11_Packet) packetMap.get(command).newInstance();}private static Test_11_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_11_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請(qǐng)求數(shù)據(jù)包實(shí)體類* * @author outman*/ @Data class Test_11_LoginRequestPacket extends Test_11_Packet {private int userId;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應(yīng)數(shù)據(jù)包實(shí)體類* * @author outman*/ @Data class Test_11_LoginResponsePacket extends Test_11_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應(yīng)碼集合*/interface Code {// 成功的響應(yīng)碼public static final int SUCCESS = 10000;// 失敗的響應(yīng)碼public static final int FAIL = 10001;} }/*** Json序列化實(shí)現(xiàn)類* * @author outman*/ class Test_11_JSONSerializer implements Test_11_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T> T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}/*** 2019年1月3日* * @author outman** 發(fā)送消息對(duì)象*/ @Data class Test_11_MessageRequestPacket extends Test_11_Packet {private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_REQUEST;}}/*** 2019年1月3日* * @author outman 回復(fù)消息對(duì)象*/ @Data class Test_11_MessageResponsePacket extends Test_11_Packet {private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_RESPONSE;}}/*** 2019年1月21日* * @author outman* * 連接 屬性**/ interface Test_11_ChannelAttributes {// 連接登錄標(biāo)識(shí)屬性AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); }/*** 2019年1月21日* * @author outman* * 登錄相關(guān)工具類**/ class Test_11_LoginUtil {/*** @desc 判斷登錄成功* @param loginResponsePacket* @return 是否登錄成功*/public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) {boolean flag = false;if (loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) {flag = true;}return flag;}/*** @desc 標(biāo)識(shí)連接登錄成功* @param channel* @return*/public static void markAsLogin(Channel channel) {channel.attr(Test_11_ChannelAttributes.LOGIN).set(true);}/*** @desc 判斷是否登錄* @param channel* @return*/public static boolean hasLogin(Channel channel) {boolean flag = false;Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get();if (attr != null)flag = attr;return flag;} }十、 總結(jié)
十一 、 思考
總結(jié)
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 玩转LogBack
- 下一篇: Netty实战 IM即时通讯系统(十一)