日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)

發(fā)布時間:2025/3/19 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

場景

Netty的Socket編程詳解-搭建服務端與客戶端并進行數(shù)據(jù)傳輸:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023

在此基礎上要實現(xiàn)多個客戶端之間通信,實現(xiàn)類似群聊或者聊天室的功能。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。

實現(xiàn)

在上面實現(xiàn)的服務端與客戶端通信的基礎上,在src下新建com.badao.Char包,包下新建ChatServer類作為聊天室的服務端。

package com.badao.Chat;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;public class ChatServer {public static void main(String[] args) throws? Exception{EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChatServerInitializer());//綁定端口ChannelFuture channelFuture = serverBootstrap.bind(70).sync();channelFuture.channel().closeFuture().sync();}finally {//關閉事件組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }

在上面中綁定70端口并添加了一個服務端的初始化器ChatServerInitializer

所以新建類ChatServerInitializer

package com.badao.Chat;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(new ChatServerHandler());} }

使其繼承ChannelInitializer,并重寫InitChannel方法,在方法中使用Netty自帶的處理器進行編碼的處理并最后添加一個自定義的處理器ChatServerHandler

新建處理器類ChatServerHandler

package com.badao.Chat;import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {Channel channel = ctx.channel();channelGroup.forEach(ch->{if(channel!=ch){ch.writeAndFlush(channel.remoteAddress()+"發(fā)送的消息:"+msg+"\n");}else{ch.writeAndFlush("[自己]:"+msg+"\n");}});}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"加入\n");channelGroup.add(channel);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"離開\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"上線了\n");System.out.println("當前在線人數(shù):"+channelGroup.size());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"下線了\n");System.out.println("當前在線人數(shù):"+channelGroup.size());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }

使處理器繼承SimpleChannelinboundHandler并重寫channelRead0方法。

在最上面聲明了一個通道組的通過 DefaultChannelGroup(GlobalEventExecutor.INSTANCE)

獲取其單例,只要是建立連接的客戶端都會自動添加進此通道組中。

然后只要是客戶端與服務端發(fā)送消息后就會執(zhí)行該方法。

在此方法中直接遍歷通道組,判斷通道組里面的每一個客戶端是不是當前發(fā)消息的客戶端。

如果是就顯示自己發(fā)送消息,如果不是則獲取遠程地址并顯示發(fā)送消息。

然后就是實現(xiàn)客戶端的上線功能以及在線人數(shù)統(tǒng)計的功能。

在上面的處理器中重寫channelActive方法,此方法會在通道激活即建立連接后調(diào)用

??? @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"上線了\n");System.out.println("當前在線人數(shù):"+channelGroup.size());}

同理重寫channelInactive方法,此方法會在斷掉連接后調(diào)用

??? @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"下線了\n");System.out.println("當前在線人數(shù):"+channelGroup.size());}

然后就是實現(xiàn)向所有的客戶端廣播新建客戶端加入聊天室的功能

重寫handlerAdded方法,此方法會在將通道添加到通道組中調(diào)用,所以在此方法中獲取加入到通道組的遠程地址

并使用channelGroup的writeAndFlush方法就能實現(xiàn)向所有建立連接的客戶端發(fā)送消息,新的客戶端剛上線時不用向自己

發(fā)送上線消息,所以在廣播完上線消息后再講此channel添加到channelGroup中。

??? @Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"加入\n");channelGroup.add(channel);}

同理實現(xiàn)下線提醒需要重寫handlerRemoved方法

??? @Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"離開\n");}

但是此方法中不用手動從channelGroup中手動去掉channel,因為Netty會自動將其移除掉。

服務端搭建完成之后再搭建客戶端,新建ChatClient類并編寫main方法,在main方法中

package com.badao.Chat;import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;import java.io.BufferedReader; import java.io.InputStreamReader;public class ChatClient {public static void main(String[] args) throws? Exception {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChatClientInitializer());//綁定端口Channel channel = bootstrap.connect("localhost", 70).channel();BufferedReader br = new BufferedReader(new InputStreamReader(System.in));for(;;){channel.writeAndFlush(br.readLine()+"\r\n");}} finally {//關閉事件組eventLoopGroup.shutdownGracefully();}} }

在客戶端中讀取輸入的內(nèi)容并在一個無限循環(huán)中將輸入的內(nèi)容發(fā)送至服務端。

在Client中建立對服務端的連接同理也要設置一個初始化器ChatClientInitializer

新建初始化器的類ChatClientInitializer

package com.badao.Chat;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(new ChatClientHandler());} }

使用Netty自帶的處理器對編碼進行處理并添加一個自定義的處理器ChatClientHandler

新建類ChatClientHandler

package com.badao.Chat;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;public class ChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);} }

在重寫的channelRead0方法中只需要將收到的消息進行輸出即可。

現(xiàn)在運行服務端的main方法

為了能運行多個客戶端在IDEA中客戶端編輯

?

然后將下面的勾選上

?

然后首先運行一個客戶端

?

那么在服務端中就會輸出上線的客戶端以及在線人數(shù)

再次運行客戶端的main方法,此時服務端會輸出兩個客戶端上線

?

同時在第二個客戶端上線時第一個客戶端會收到加入的提示

?

此時停掉第二個客戶端即將第二個客戶端下線

服務端會提示下線并更新在線人數(shù)

同時在第一個客戶端會收到服務端的推送

?

再運行第二個客戶端,并在控制臺輸入消息,回車發(fā)送

?

此時第一個客戶端就會收到第二個客戶端發(fā)送的消息。

?

然后第一個客戶端再輸入一個消息并回車

?

那么第二個客戶端也能收到消息

?

示例代碼下載:

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12850228

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。