netty使用
文章目錄
- netty介紹
- netty特點:
- netty組件
- netty編程
- 服務(wù)端
- 客戶端
- Selector Bug問題
- Bootstrap介紹
netty介紹
網(wǎng)絡(luò)通訊框架 mina、netty
應(yīng)用案例:阿里RPC框架:Dubbo
netty官網(wǎng):https://netty.io/
netty特點:
1、是高性能、異步事件驅(qū)動模型,他提供對TCP、UDP、HTTP和文件傳輸?shù)闹С?br /> 2、使用更加高效的socket底層,對epoll空輪訓(xùn)引起CPU飆升問題在netty內(nèi)部進(jìn)行處理,避免的直接使用NIO的陷阱
3、采用了多種編碼、解碼的支持、對TCP的粘包、分包做了自動化處理
netty組件
Bootstrap:
netty的啟動輔助類,是客戶端和服務(wù)端的入口,Bootstrap是建客戶端連接的啟動器,ServerBootStrap是監(jiān)聽服務(wù)端端口的啟動器。
Channel:
常用的是NioServerSocketChannel和NioSocketChannel
NioServerSocketChannel負(fù)責(zé)監(jiān)聽一個tcp的端口,有連接時會獲取一個NioSocketChannel的連接實例
NioSocketChannel負(fù)責(zé)讀寫事件,連接服務(wù)端
EventLoop:
核心組件之一
通過EventLoopGroup(是在啟動輔助類中設(shè)置)生成EventLoop,內(nèi)部是一個無限循環(huán),維護(hù)了一個selector。處理所有的注冊到selector上的IO操作,在這里就維護(hù)了一個線程連接多個連接的工作
ChannelPipeline:
核心組件之一:channelHandler的容器,netty中的IO操作的通道channel,與channelHandler組成責(zé)任鏈,將所有的讀時間、寫事件、連接事件依次通過channelPipeline,處理事件
ChannelHandler
是IO事件處理的真正單元、可以自定義的channelhandler的定義來處理自己的邏輯,完全控制處理方式
channelhandler和ChannelPipeline組成的責(zé)任鏈
channelHandler分為inbound和outbound,對應(yīng)的IO的read和write的執(zhí)行鏈
執(zhí)行鏈底層實現(xiàn)是將channelHandler組裝成雙向鏈表,提供head和tail屬性
read操作的話從channelPipeline的head到tail的過程
netty編程
多客戶端和服務(wù)端通信:echo命令
引入netty依賴
服務(wù)端
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup boss = null;NioEventLoopGroup worker = null;try{//需要eventLoopggrop/*** NioEventLoopGroup是用來處理IO操作以及接收客戶端連接的事件循環(huán)組* 給定兩個事件循環(huán)組* 一個稱之為boss,用來接收連接* 一個稱之為worker,用來處理已經(jīng)接受的連接*/boss = new NioEventLoopGroup(1);worker = new NioEventLoopGroup();//默認(rèn)是CPU個數(shù)乘以2/*** 啟動輔助類:ServerBootSttap* 將配置信息配置在輔助類上,用來啟動或者設(shè)置相關(guān)屬性:TCP* */ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap//指定事件循環(huán)組.group(boss,worker)//主事件循環(huán)組接收的通道的實例類型.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//存放channelHandler的容器ChannelPipeline pipeline = ch.pipeline();//字符串解碼器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());//自定義channelHandlerpipeline.addLast(new ServerHandler());}});//啟動服務(wù)端,通過同步阻塞方式來啟動服務(wù)端,即調(diào)用sync會阻塞直至服務(wù)端綁定端口后才返回ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();System.out.println("服務(wù)器啟動了");//使用同步方法來管理程序channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {//關(guān)閉線程資源if(boss!=null){boss.shutdownGracefully();}if(worker!=null){worker.shutdownGracefully();}}} } /*** 讀取數(shù)據(jù)的提供了接口:ChannelInboundHandler*/ class ServerHandler extends SimpleChannelInboundHandler<String> {//接收數(shù)據(jù)的發(fā)方法@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println("channelRead0"+msg);}//連接成功觸發(fā)該方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+"連接服務(wù)端成功");}//斷開連接觸發(fā)給方法@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+" 斷開和服務(wù)端連接");}//接收數(shù)據(jù)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(ctx.channel().remoteAddress()+":"+msg);ctx.channel().writeAndFlush("echo:" + msg);}}客戶端
public class NettyClient {public static void main(String[] args) throws InterruptedException {//創(chuàng)建事件循環(huán)組NioEventLoopGroup loopGroup = new NioEventLoopGroup();//創(chuàng)建啟動輔助類Bootstrap bootstrap = new Bootstrap();bootstrap.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//獲取pipeline的實例ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());//自定義channelHandler 接收服務(wù)端返回的消息pipeline.addLast(new ClientHandler());}});//客戶端連接服務(wù)端 通過同步方式來保證連接成功Channel channel = bootstrap.connect("127.0.0.1", 9999).sync().channel();//給服務(wù)端通信發(fā)送消息Scanner scanner = new Scanner(System.in);scanner.useDelimiter("\n");while (true) {String msg = scanner.nextLine();if ("exit".equals(msg)) break;//發(fā)送消息給服務(wù)端channel.writeAndFlush(msg);}channel.closeFuture().sync();} } class ClientHandler extends SimpleChannelInboundHandler<String>{//接收消息@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String o) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);} }Selector Bug問題
epoll特定情況下會發(fā)生空輪訓(xùn),CPU使用100%?
NioEventLoop類下的run()方法
解決方案:對Selector的select操作周期進(jìn)行統(tǒng)計:selectCnt,每完成一次空的select對selectCnt加1,變量selectCnt會隨著連續(xù)的空輪訓(xùn)會逐漸變大,到達(dá)了閾值(默認(rèn)是512),則執(zhí)行rebuildSelector()進(jìn)行重新構(gòu)建
會新建一個selector實例,將舊的selector實例上所有的注冊的有效事件全部重新注冊到新的selector實例上,會將舊的selector調(diào)用close方法關(guān)閉掉
Bootstrap介紹
serverBootstrap//指定事件循環(huán)組.group(boss, worker)//主事件循環(huán)組接收的通道的實例類型.channel(NioServerSocketChannel.class).option(ChannelOption.SO_SNDBUF, 1024) //發(fā)送緩沖區(qū).childOption(ChannelOption.AUTO_READ, true).attr(AttributeKey.valueOf("key"),"1023") //自定義的參數(shù)設(shè)置.childAttr(AttributeKey.valueOf("key1"),"1024") //附帶數(shù)據(jù).childHandler(new ChannelInitializer <NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//存放channelHandler的容器ChannelPipeline pipeline = ch.pipeline();//字符串解碼器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());//自定義channelHandlerpipeline.addLast(new Server212Handler());}})總結(jié)
- 上一篇: BIO与NIO比较
- 下一篇: mybatis动态代理