Netty 4.x Netty 实现聊天功能
Netty 實現(xiàn)聊天功能
Netty 是一個 Java NIO 客戶端服務(wù)器框架,使用它可以快速簡單地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,比如服務(wù)器和客戶端的協(xié)議。Netty 大大簡化了網(wǎng)絡(luò)程序的開發(fā)過程比如 TCP 和 UDP 的 socket 服務(wù)的開發(fā)。更多關(guān)于 Netty 的知識,可以參閱《Netty 4.x 用戶指南》https://github.com/waylau/netty-4-user-guide
下面,就基于 Netty 快速實現(xiàn)一個聊天小程序。
準(zhǔn)備
- JDK 7+
- Maven 3.2.x
- Netty 4.x
- Eclipse 4.x
服務(wù)端
讓我們從 handler (處理器)的實現(xiàn)開始,handler 是由 Netty 生成用來處理 I/O 事件的。
SimpleChatServerHandler.java
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)Channel incoming = ctx.channel();for (Channel channel : channels) {channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");}channels.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)Channel incoming = ctx.channel();for (Channel channel : channels) {channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");}channels.remove(ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)Channel incoming = ctx.channel();for (Channel channel : channels) {if (channel != incoming){channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");} else {channel.writeAndFlush("[you]" + s + "\n");}}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)Channel incoming = ctx.channel();System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)Channel incoming = ctx.channel();System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)Channel incoming = ctx.channel();System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");// 當(dāng)出現(xiàn)異常就關(guān)閉連接cause.printStackTrace();ctx.close();} }1.SimpleChatServerHandler 繼承自?SimpleChannelInboundHandler,這個類實現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現(xiàn)接口方法。
2.覆蓋了 handlerAdded() 事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時,客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel
3.覆蓋了 handlerRemoved() 事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel
4.覆蓋了 channelRead0() 事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時,將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()
5.覆蓋了 channelActive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端活動
6.覆蓋了 channelInactive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端不活動
7.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對象才會被調(diào)用,即當(dāng) Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應(yīng)該被記錄下來并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個錯誤碼的響應(yīng)消息。
SimpleChatServerInitializer.java
SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。
public class SimpleChatServerInitializer extendsChannelInitializer<SocketChannel> {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", new SimpleChatServerHandler());System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上");} }SimpleChatServer.java
編寫一個 main() 方法來啟動服務(wù)端。
public class SimpleChatServer {private int port;public SimpleChatServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new SimpleChatServerInitializer()) //(4).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)System.out.println("SimpleChatServer 啟動了");// 綁定端口,開始接收進(jìn)來的連接ChannelFuture f = b.bind(port).sync(); // (7)// 等待服務(wù)器 socket 關(guān)閉 。// 在這個例子中,這不會發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();System.out.println("SimpleChatServer 關(guān)閉了");}}public static void main(String[] args) throws Exception {int port;if (args.length > 0) {port = Integer.parseInt(args[0]);} else {port = 8080;}new SimpleChatServer(port).run();} }1.NioEventLoopGroup是用來處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實現(xiàn)用來處理不同的傳輸。在這個例子中我們實現(xiàn)了一個服務(wù)端的應(yīng)用,因此會有2個 NioEventLoopGroup 會被使用。第一個經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。第二個經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的?Channel上都需要依賴于 EventLoopGroup 的實現(xiàn),并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。
2.ServerBootstrap是一個啟動 NIO 服務(wù)的輔助啟動類。你可以在這個服務(wù)中直接使用 Channel,但是這會是一個復(fù)雜的處理過程,在很多情況下你并不需要這樣做。
3.這里我們指定使用NioServerSocketChannel類來舉例說明一個新的 Channel 如何接收進(jìn)來的連接。
4.這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應(yīng)的ChannelPipeline來實現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
5.你可以設(shè)置這里指定的 Channel 實現(xiàn)的配置參數(shù)。我們正在寫一個TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項比如tcpNoDelay 和 keepAlive。請參考ChannelOption和詳細(xì)的ChannelConfig實現(xiàn)的接口文檔以此可以對ChannelOption 的有一個大概的認(rèn)識。
6.option() 是提供給NioServerSocketChannel用來接收進(jìn)來的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個例子中也是 NioServerSocketChannel。
7.我們繼續(xù),剩下的就是綁定端口然后啟動服務(wù)。這里我們在機(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。
恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。
客戶端
SimpleChatClientHandler.java
客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {System.out.println(s);} }SimpleChatClientInitializer.java
與服務(wù)端類似
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", new SimpleChatClientHandler());} }SimpleChatClient.java
編寫一個 main() 方法來啟動客戶端。
public class SimpleChatClient {public static void main(String[] args) throws Exception{new SimpleChatClient("localhost", 8080).run();}private final String host;private final int port;public SimpleChatClient(String host, int port){this.host = host;this.port = port;}public void run() throws Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new SimpleChatClientInitializer());Channel channel = bootstrap.connect(host, port).sync().channel();BufferedReader in = new BufferedReader(new InputStreamReader(System.in));while(true){channel.writeAndFlush(in.readLine() + "\r\n");}} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}}運行效果
先運行 SimpleChatServer,再可以運行多個 SimpleChatClient,控制臺輸入文本繼續(xù)測試
源碼
見https://github.com/waylau/netty-4-user-guide-demos中 simplechat
參考
Netty 4.x 用戶指南https://github.com/waylau/netty-4-user-guide
總結(jié)
以上是生活随笔為你收集整理的Netty 4.x Netty 实现聊天功能的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CompletableFuture并行异
- 下一篇: https和http的主要区别