Netty-案例 WebSocket与netty实现长连接案例(代码注释详解)
生活随笔
收集整理的這篇文章主要介紹了
Netty-案例 WebSocket与netty实现长连接案例(代码注释详解)
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Netty
記錄學(xué)習(xí),開(kāi)心學(xué)習(xí),來(lái)源尚硅谷韓順平netty視頻
1 NettyServer
package com.fs.netty.simple;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) throws InterruptedException {//創(chuàng)建boosGroup 和 workerGroup//創(chuàng)建兩個(gè)線程組,bossGroup workerGroup//bossGroup只是處理鏈接請(qǐng)求//workerGroup 真正的與客戶端業(yè)務(wù)處理,會(huì)交給workerGroup完成,自己不做處理//兩個(gè)都是無(wú)線循環(huán)EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();//處理整個(gè)服務(wù)的異常try {//創(chuàng)建服務(wù)器端啟動(dòng)的配置參數(shù)對(duì)象ServerBootstrap bootstrap = new ServerBootstrap();//使用鏈?zhǔn)骄幊虂?lái)進(jìn)行參數(shù)設(shè)置bootstrap.group(bossGroup, workerGroup)//設(shè)置兩個(gè)線程組.channel(NioServerSocketChannel.class)//設(shè)置服務(wù)器的通道實(shí)現(xiàn)使用NioServerSocketChannel.option(ChannelOption.SO_BACKLOG,128)//設(shè)置線程隊(duì)列等待的個(gè)數(shù).childOption(ChannelOption.SO_KEEPALIVE,true)//設(shè)置鏈接保持活動(dòng)鏈接狀態(tài).childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建一個(gè)通道初始化對(duì)象,使用匿名內(nèi)部類(lèi)方式//給pipeline設(shè)置處理器@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//addLast給ChannelPipeline添加我們自定義的handlersocketChannel.pipeline().addLast(new NettyServerHandler());}});//給我們的workerGroup的EventLoop對(duì)應(yīng)的管道設(shè)置處理器//給我們服務(wù)端綁定端口并且同步處理,生成一個(gè)ChannelFuture,啟動(dòng)服務(wù)器ChannelFuture channelFuture = bootstrap.bind(6668).sync();System.out.println("----服務(wù)器is ready");//對(duì)關(guān)閉通道進(jìn)行監(jiān)聽(tīng)channelFuture.channel().closeFuture().sync();}finally {//出現(xiàn)異常,優(yōu)雅的關(guān)閉bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }1.1 NettyServerHandler
package com.fs.netty.simple;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;//繼承ChannelInboundHandlerAdapter/*** 我們自定義一個(gè)Handler,需要繼承某個(gè)handler適配器(ChannelInboundHandlerAdapter)* 這時(shí)我們自定義的handler才能稱之為一個(gè)handler* 因?yàn)槲覀円袷啬承┮?guī)范,有些方法需要從寫(xiě)的*/ public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 讀取數(shù)據(jù)事件(這里我們可以讀取客戶端發(fā)送的消息)* @param ctx 上下文對(duì)象,包含了管道pipeline(做業(yè)務(wù)邏輯) 通道channel(做數(shù)據(jù)讀寫(xiě)) 地址等* @param msg 客戶端發(fā)送的數(shù)據(jù),默認(rèn)Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服務(wù)器讀取線程:"+Thread.currentThread().getName());System.out.println("server ctx :"+ctx);//將msg轉(zhuǎn)成byteBuf(是netty提供的,不是NIO的byteBuffer,ByteBuf性能更高)ByteBuf byteBuf = (ByteBuf) msg;System.out.println("看看channel與pipeline的關(guān)系");Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline();//pipeline 本質(zhì)是一個(gè)雙向鏈表,本質(zhì)是出站入站//Debug得知,channle與pipeline是相互包含的關(guān)系,你中有我,我中有你System.out.println("客戶端發(fā)送消息是:"+byteBuf.toString(CharsetUtil.UTF_8));System.out.println("客戶端地址:"+ channel.remoteAddress());}/*** 數(shù)據(jù)讀取完畢* @param ctx 上下文對(duì)象*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//將數(shù)據(jù)寫(xiě)入到緩存,并刷新//一般對(duì)發(fā)送的數(shù)據(jù)進(jìn)行編碼ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~xixixi",CharsetUtil.UTF_8));}/*** 發(fā)生異常處理,一般需要關(guān)閉通道* @param ctx 上下文對(duì)象*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }2 NettyClient
package com.fs.netty.simple;import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws InterruptedException {//客戶端只需要一個(gè)事件循環(huán)組NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {//創(chuàng)建客戶端啟動(dòng)對(duì)象//注意客戶端使用的不是 ServerBootStrap 而是BootStarpBootstrap bootstrap = new Bootstrap();//設(shè)置相關(guān)參數(shù)bootstrap.group(eventLoopGroup)//設(shè)置線程組.channel(NioSocketChannel.class)//設(shè)置客戶端通道的實(shí)現(xiàn)類(lèi)(反射處理).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定義客戶端處理器}});System.out.println("客戶端 is --- ok");//啟動(dòng)客戶端去鏈接服務(wù)的//關(guān)于ChannelFuture 要分析,涉及到netty的異步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//給關(guān)閉通道進(jìn)行監(jiān)聽(tīng)channelFuture.channel().closeFuture().sync();}finally {//出現(xiàn)異常,優(yōu)雅的關(guān)閉eventLoopGroup.shutdownGracefully();}} }2.2 NettyClientHandler
package com.fs.netty.simple;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;/* 客戶端處理器*/ public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 當(dāng)通道就緒就會(huì)觸發(fā)* @param ctx 上下文對(duì)象,包含了管道pipeline(做業(yè)務(wù)邏輯) 通道channel(做數(shù)據(jù)讀寫(xiě)) 地址等*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client :"+ ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Server:hahaha", CharsetUtil.UTF_8));}/*** 當(dāng)通道有讀取事件的時(shí)候,會(huì)觸發(fā)* @param ctx 上下文對(duì)象* @param msg 消息*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf)msg;System.out.println("服務(wù)器回復(fù)的消息:"+byteBuf.toString(CharsetUtil.UTF_8));System.out.println("服務(wù)器地址:"+ctx.channel().remoteAddress());}/*** 異常觸發(fā)* @param ctx 上下文對(duì)象* @param cause 異常對(duì)象*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }3 用戶自定義普通任務(wù)隊(duì)列 異步執(zhí)行(將任務(wù)提交到taskQueue)
在channelRead方法中將消息存放在taskQueue隊(duì)列中
(NettyServerHandler extends ChannelInboundHandlerAdapter 重寫(xiě)的channelRead()方法)
4 用戶自定義定時(shí)任務(wù) 異步執(zhí)行(將任務(wù)提交到scheduleTaskQueue中)
/*** 讀取數(shù)據(jù)事件(這里我們可以讀取客戶端發(fā)送的消息)* @param ctx 上下文對(duì)象,包含了管道pipeline(做業(yè)務(wù)邏輯) 通道channel(做數(shù)據(jù)讀寫(xiě)) 地址等* @param msg 客戶端發(fā)送的數(shù)據(jù),默認(rèn)Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//這里有一個(gè)非常耗時(shí)的時(shí)間業(yè)務(wù),---》異步執(zhí)行,提交到channel、對(duì)應(yīng)的 scheduleTaskQueuectx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {try {Thread.sleep(10*1000);} catch (InterruptedException e) {e.printStackTrace();}ctx.writeAndFlush(Unpooled.copiedBuffer("hello-客戶端-讀完",CharsetUtil.UTF_8));}},5, TimeUnit.SECONDS);//參數(shù):線程,時(shí)間,時(shí)間參數(shù)System.out.println("繼續(xù)執(zhí)行");}5 FutureListener 監(jiān)聽(tīng)事件
例如:我們的服務(wù)端執(zhí)行了 bootstrap.bind(6668).sync(); 得到ChannelFuture
//給我們服務(wù)端綁定端口并且同步處理,生成一個(gè)ChannelFuture,啟動(dòng)服務(wù)器ChannelFuture channelFuture = bootstrap.bind(6668).sync();//使用FutureListener 給channelFuture 注冊(cè)監(jiān)聽(tīng)器,監(jiān)控綁定成功的時(shí)間channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()){System.out.println("監(jiān)聽(tīng)端口6668成功");} else {System.out.println("監(jiān)聽(tīng)端口6668失敗");}}});6 HTTP案例
6.1 TestHTTPServer
package com.fs.netty.http;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;/* http案例*/ public class TestHTTPServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitialize());//使用自定義的解碼器ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()){System.out.println("監(jiān)聽(tīng)8080成功");}else {System.out.println("監(jiān)聽(tīng)8080失敗");}}});channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();bossGroup.shutdownGracefully();}} }6.2 TestServerInitialize
package com.fs.netty.http;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec;public class TestServerInitialize extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//向管道加入處理器//得到管道ChannelPipeline pipeline = socketChannel.pipeline();//加入netty提供的httpServerCodec codec 是編解碼器//HttpServerCodec是netty提供的http的編解碼器pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());//增加一個(gè)自己的處理器pipeline.addLast("MyTestServerHandler",new TestServerHandler());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }6.3 TestServerHandler
package com.fs.netty.http;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil;/* 自定義handlerSimpleChannelInboundHandler 是ChannelInboundHandlerAdapter的子類(lèi) HttpObject 表示的客戶端和服務(wù)器端相互通信的數(shù)據(jù)被封裝成httpObject*/ public class TestServerHandler extends SimpleChannelInboundHandler<HttpObject> {/*** 讀取客戶端數(shù)據(jù)* @param channelHandlerContext 上下文* @param msg 封裝的消息為HttpObject*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {//判斷msg是不是HttpRequest請(qǐng)求if (msg instanceof HttpRequest){System.out.println("msg 類(lèi)型 = " + msg.getClass());System.out.println("客戶端地址 : "+ channelHandlerContext.channel().remoteAddress());//回復(fù)信息給瀏覽器,要發(fā)送http協(xié)議信息ByteBuf byteBuf = Unpooled.copiedBuffer("Hello-我是服務(wù)器", CharsetUtil.UTF_8);//構(gòu)造一個(gè)http的響應(yīng),即 httpResponse//參數(shù):HTTP版本 HTTP詳情狀態(tài)碼 響應(yīng)內(nèi)容HttpResponse defaultHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);//設(shè)置其他的信息defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");//設(shè)置返回參數(shù)類(lèi)型defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());//設(shè)置返回參數(shù)長(zhǎng)度//將數(shù)據(jù)寫(xiě)入并刷新channelHandlerContext.writeAndFlush(defaultHttpResponse);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }執(zhí)行結(jié)果
7 Unpooled對(duì)象
7.1 Unpooled案例1 ByteBuf的使用1
package com.fs.netty.buf;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled;public class NettyByteBuf01 {public static void main(String[] args) {//向創(chuàng)建一個(gè)bytebuf//說(shuō)明:先創(chuàng)建對(duì)象,該對(duì)象包含一個(gè)byte數(shù)組,是一個(gè)byte[10]//在netty的buffer中不需要slip進(jìn)行反轉(zhuǎn),//因?yàn)樗S護(hù)了 readerindex 和 writeindex 和 capacity ,將buffer分成了三個(gè)區(qū)域//0----》readerindex已經(jīng)讀取的區(qū)域//readerindex-----》writeindex 可讀的區(qū)域//writeindex------》capacity 表示可寫(xiě)的區(qū)域ByteBuf buffer = Unpooled.buffer(10);for (int i = 0; i < 10; i++) {buffer.writeByte(i);}//通過(guò)debug就可以看到readerindex和writeindex在不斷的變化System.out.println("capacity:"+buffer.capacity());//輸出for (int i= 0;i<buffer.capacity();i++){ // System.out.println(buffer.getByte(i));//這個(gè)不會(huì)照成readerindex變化,因?yàn)橹付怂饕?/span>System.out.println(buffer.readByte());//這會(huì)照成readerindex變化}} }7.2 Unpooled案例2 ByteBuf的使用2
package com.fs.netty.buf;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil;import java.nio.charset.Charset;public class NettyByteBuf02 {public static void main(String[] args) {//向創(chuàng)建一個(gè)bytebufByteBuf byteBuf = Unpooled.copiedBuffer("hello,world", CharsetUtil.UTF_8);//使用相關(guān)方法if (byteBuf.hasArray()){byte[] content = byteBuf.array();//將 content 轉(zhuǎn)成字符串System.out.println(new String(content, Charset.forName("utf-8")));//byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)System.out.println("byteBuf="+byteBuf);System.out.println(byteBuf.arrayOffset());//0System.out.println(byteBuf.readerIndex());//0System.out.println(byteBuf.writerIndex());//11System.out.println(byteBuf.capacity());//33//由此可得,copiedBuffer這樣的方式創(chuàng)建的byteBuf的容量不是給定的字符串的大小System.out.println(byteBuf.readableBytes());//11System.out.println(byteBuf.readByte());//這里讀一下 值:104 因?yàn)镠的阿斯克碼是104System.out.println(byteBuf.readableBytes());//10}} }8 Netty實(shí)現(xiàn)群聊系統(tǒng)
8.1服務(wù)端
GroupChatServer
package com.fs.netty.groupchat.server;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;/* 群聊服務(wù)端*/ public class GroupChatServer {private int port;public GroupChatServer(int port){this.port = port;}//編寫(xiě)一個(gè)run方法,處理客戶端的請(qǐng)求public void run() throws InterruptedException {//創(chuàng)建兩個(gè)線程組EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//獲取到pipelineChannelPipeline pipeline = socketChannel.pipeline();//向pipeline加入解碼器pipeline.addLast("decoder",new StringDecoder());//向pipeline加入編碼器pipeline.addLast("encoder",new StringEncoder());//加入自己的業(yè)務(wù)處理類(lèi)pipeline.addLast(new GroupChatServerHandler());}});System.out.println("netty服務(wù)器已啟動(dòng):"+port);ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//監(jiān)聽(tīng)關(guān)閉channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {GroupChatServer groupChatServer = new GroupChatServer(7000);groupChatServer.run();} }GroupChatServerHandler
package com.fs.netty.groupchat.server;import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat; import java.util.Date;//創(chuàng)建服務(wù)端處理器,指定發(fā)送數(shù)據(jù)為String,重寫(xiě)channelRead0方法 public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {//定義一個(gè)channle組,管理所有的channel// GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個(gè)單列private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//handlerAdded 表示鏈接建立,一旦鏈接,第一個(gè)執(zhí)行//一旦鏈接,將當(dāng)前的channel加入到 channelGroup@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//將該客戶加入聊天的信息推送給其他在線的客戶端//channelGroup.writeAndFlush 該方法回將channelGroup中所有的channel便利,并發(fā)送信息channelGroup.writeAndFlush(simpleDateFormat.format(new Date())+"[客戶端]"+channel.remoteAddress()+"加入聊天\n");channelGroup.add(channel);}/*** 表示channel處于活動(dòng)狀態(tài),,提示某某上線*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+":上線了");}/*** 當(dāng)channel處于非活動(dòng)狀態(tài),提示離線了* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println(ctx.channel().remoteAddress()+":離線了");}/*** 表示斷開(kāi)鏈接觸發(fā),將某某客戶離開(kāi)信息推送給當(dāng)前在線的客戶*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush(simpleDateFormat.format(new Date())+"[客戶端]"+channel.remoteAddress()+"離開(kāi)了\n");//這個(gè)方法執(zhí)行后,當(dāng)前Channel會(huì)自動(dòng)從channelGroup中移除System.out.println("當(dāng)前channelGroup大小:"+channelGroup.size());}/*** 讀取數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)發(fā)所有在線的人員*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {//獲取當(dāng)前channelChannel channel = channelHandlerContext.channel();//這時(shí)遍歷channelGroup,根據(jù)不同情況回送不同的消息channelGroup.forEach(ch -> {if (ch != channel) {//說(shuō)明不是當(dāng)前的channel、就轉(zhuǎn)發(fā)消息ch.writeAndFlush(simpleDateFormat.format(new Date())+"[客戶]"+channel.remoteAddress() + " 發(fā)送了消息:"+msg+"\n");}else {//顯示下自己發(fā)送的消息ch.writeAndFlush(simpleDateFormat.format(new Date())+"[自己]發(fā)送了消息"+msg+"\n");}});}/*** 異常處理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//關(guān)閉通道ctx.close();} }8.2 客戶端
GroupChatClient
package com.fs.netty.groupchat.client;import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/* 群聊客戶端*/ public class GroupChatClient {private final String host;private final int port;public GroupChatClient(String host,int port){this.host = host;this.port = port;}public void run() throws InterruptedException {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//加入相關(guān)的handlerpipeline.addLast("decoder",new StringDecoder());pipeline.addLast("encoder",new StringEncoder());//加入自己的處理器pipeline.addLast(new GroupChatClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();Channel channel = channelFuture.channel();System.out.println(channel.localAddress()+"客戶端已經(jīng)準(zhǔn)備好了");//客戶端輸入信息,創(chuàng)建掃描器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String s = scanner.nextLine();//通過(guò)channel發(fā)送服務(wù)器端channel.writeAndFlush(s+"\r\n");}}finally {eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {GroupChatClient groupChatClient = new GroupChatClient("127.0.0.1", 7000);groupChatClient.run();} }GroupChatClientHandler
package com.fs.netty.groupchat.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;/* 客戶端處理,接受String、*/ public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {/*讀取消息*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println("[服務(wù)端發(fā)送的消息] "+ msg);} }8.3 運(yùn)行效果
9 netty心跳處理(讀寫(xiě)空閑)
HeartbeatNettyServer
package com.fs.netty.heartbeat;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** netty心跳機(jī)制*/ public class HeartbeatNettyServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))//加入日志處理類(lèi).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//加入一個(gè)netty提供的 IdleStateHandler 處理空閑狀態(tài)的處理器//參數(shù)說(shuō)明//1.long readerIdleTime,表示多長(zhǎng)時(shí)間沒(méi)有讀了,就會(huì)發(fā)送一個(gè)心跳檢測(cè)包//2. long writerIdleTime,表示多長(zhǎng)時(shí)間沒(méi)有寫(xiě)了。就會(huì)發(fā)送一個(gè)心跳檢測(cè)包//3. long allIdleTime,表示多長(zhǎng)時(shí)間沒(méi)有讀寫(xiě)了,就會(huì)發(fā)送一個(gè)心跳檢測(cè)包//4 unit 時(shí)間單位//當(dāng)IdleStateEvent觸發(fā)后,就會(huì)傳遞給管道的下一個(gè)handler去處理//通過(guò)調(diào)用(觸發(fā))下一個(gè)handler 的 userEventTinggered。在該方法中去處理,pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));//加入一個(gè)空閑檢測(cè)進(jìn)一步處理的handler(自定義)pipeline.addLast(new HeartbeatNettyHandler());}});//啟動(dòng)服務(wù)器ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }HeartbeatNettyHandler
package com.fs.netty.heartbeat;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent;public class HeartbeatNettyHandler extends ChannelInboundHandlerAdapter {/*** @param ctx 上下文* @param evt 事件*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {//將evt 向下轉(zhuǎn)型 成IdleStateEventIdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "讀空閑";break;case WRITER_IDLE:eventType = "寫(xiě)空閑";break;case ALL_IDLE:eventType = "讀寫(xiě)空閑";break;}System.out.println(ctx.channel().remoteAddress() + "---超時(shí)事件---" + eventType);System.out.println("服務(wù)器做相應(yīng)處理");//如果發(fā)生空閑,我們關(guān)閉通道ctx.channel().close();}} }10 WebSocket案例
案例運(yùn)行結(jié)果
先運(yùn)行服務(wù),后打開(kāi)html
MyServer
MyWebSocketFrameHandler
package com.fs.netty.websocket;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.time.LocalDateTime;//自定義的handler //TextWebSocketFrame 表示一個(gè)文本幀 public class MyWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {System.out.println("服務(wù)器收到的消息:"+msg.text());//回復(fù)客戶端channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時(shí)間:"+ LocalDateTime.now()+",收到的消息:"+msg.text()));}/*當(dāng)web客戶端鏈接后就會(huì)觸發(fā)這個(gè)方法*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//id 表示唯一值,asLongText 是唯一的,asShortText不是唯一的,有可能重復(fù)System.out.println("handlerAdded 被調(diào)用:"+ctx.channel().id().asLongText());System.out.println("handlerAdded 被調(diào)用:"+ctx.channel().id().asShortText());}//當(dāng)鏈接中斷會(huì)調(diào)用。@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerRemoved 被調(diào)用:"+ctx.channel().id().asLongText());}//處理異常@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("異常發(fā)送:"+cause.getMessage());//關(guān)閉通道ctx.close();} }hello.html
<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title> </head> <body> <script>var socket;//判斷當(dāng)前瀏覽器是否支持websocket編程if (window.WebSocket){socket = new WebSocket("ws://localhost:7000/hello");//相當(dāng)于channelRead0 ev 收到服務(wù)器端回送的消息socket.onmessage = function (ev){var rt = document.getElementById("responseText");rt.value = rt.value+"\n"+ev.data;}//相當(dāng)于連接服務(wù)器開(kāi)啟socket.onopen = function (ev){var rt = document.getElementById("responseText");rt.value = "連接開(kāi)啟"}//連接關(guān)閉socket.onclose = function (ev){var rt = document.getElementById("responseText");rt.value = rt.value+"\n"+"連接關(guān)閉了";}//發(fā)送消息到服務(wù)器function send(message){//判斷socket是否創(chuàng)建好if (!window.socket){return;}//判斷連接是否開(kāi)啟if (socket.readyState == WebSocket.OPEN){//通過(guò)socket 發(fā)送消息socket.send(message)}else {alert("連接沒(méi)有開(kāi)啟")}}}else {alert("您當(dāng)前的瀏覽器不支持webSocket")} </script><form onsubmit="return false"><textarea name="message" style="height: 300px;width: 300px"></textarea><input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)"><textarea id="responseText" style="height: 300px;width: 300px"></textarea><input type="button" value="清空內(nèi)容" onclick="document.getElementById('responseText').value=''"></form> </body> </html>總結(jié)
以上是生活随笔為你收集整理的Netty-案例 WebSocket与netty实现长连接案例(代码注释详解)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Nginx的动态代理,负载均衡,动静分离
- 下一篇: Jenkins-自动化构建、测试和部署-