日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

漫谈Java IO之 Netty与NIO服务器

發布時間:2023/12/18 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 漫谈Java IO之 Netty与NIO服务器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前面介紹了基本的網絡模型以及IO與NIO,那么有了NIO來開發非阻塞服務器,大家就滿足了嗎?有了技術支持,就回去追求效率,因此就產生了很多NIO的框架對NIO進行封裝——這就是大名鼎鼎的Netty。

前幾篇的內容,可以參考:

  • 網絡IO的基本知識與概念
  • 普通IO以及BIO服務器
  • NIO的使用與服務器Hello world
  • Netty的使用與服務器Hello world
  • 為什么要使用開源框架?

    這個問題幾乎可以當做廢話,框架肯定要比一些原生的API封裝了更多地功能,重復造輪子在追求效率的情況并不是明智之舉。那么先來說說NIO有什么缺點吧:

  • NIO的類庫和API還是有點復雜,比如Buffer的使用
  • Selector編寫復雜,如果對某個事件注冊后,業務代碼過于耦合
  • 需要了解很多多線程的知識,熟悉網絡編程
  • 面對斷連重連、保丟失、粘包等,處理復雜
  • NIO存在BUG,根據網上言論說是selector空輪訓導致CPU飆升,具體有興趣的可以看看JDK的官網
  • 那么有了這些問題,就急需一些大牛們開發通用框架來方便勞苦大眾了。最致命的NIO框架就是MINA和Netty了,這里不得不說個小插曲:

    先來看看MINA的主要貢獻者:

    再來看看NETYY的主要貢獻者:

    總結起來,有這么幾點:

  • MINA和Netty的主要貢獻者都是同一個人——Trustin lee,韓國Line公司的。
  • MINA于2006年開發,到14、15年左右,基本停止維護
  • Nety開始于2009年,目前仍由蘋果公司的norman maurer在主要維護。
  • Norman Maurer是《Netty in Action》一書的作者
  • 因此,如果讓你選擇你應該知道選擇誰了吧。另外,MINA對底層系統要求功底更深,且國內Netty的氛圍更好,有李林峰等人在大力宣傳(《Netty權威指南》的作者)。

    講了一大堆的廢話之后,總結來說就是——Netty有前途,學它準沒錯。

    Netty介紹

    按照定義來說,Netty是一個異步、事件驅動的用來做高性能、高可靠性的網絡應用框架。主要的優點有:

  • 框架設計優雅,底層模型隨意切換適應不同的網絡協議要求
  • 提供很多標準的協議、安全、編碼解碼的支持
  • 解決了很多NIO不易用的問題
  • 社區更為活躍,在很多開源框架中使用,如Dubbo、RocketMQ、Spark等
  • 主要支持的功能或者特性有:

  • 底層核心有:Zero-Copy-Capable Buffer,非常易用的靈拷貝Buffer(這個內容很有意思,稍后專門來說);統一的API;標準可擴展的時間模型
  • 傳輸方面的支持有:管道通信(具體不知道干啥的,還請老司機指教);Http隧道;TCP與UDP
  • 協議方面的支持有:基于原始文本和二進制的協議;解壓縮;大文件傳輸;流媒體傳輸;protobuf編解碼;安全認證;http和websocket
  • 總之提供了很多現成的功能可以直接供開發者使用。

    Netty服務器小例子

    基于Netty的服務器編程可以看做是Reactor模型:

    即包含一個接收連接的線程池(也有可能是單個線程,boss線程池)以及一個處理連接的線程池(worker線程池)。boss負責接收連接,并進行IO監聽;worker負責后續的處理。為了便于理解Netty,直接看看代碼:

    package cn.xingoo.book.netty.chap04;import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress; import java.nio.charset.Charset;public class NettyNioServer {public void serve(int port) throws InterruptedException {final ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));// 第一步,創建線程池EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try{// 第二步,創建啟動類ServerBootstrap b = new ServerBootstrap();// 第三步,配置各組件b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(buffer.duplicate()).addListener(ChannelFutureListener.CLOSE);}});}});// 第四步,開啟監聽ChannelFuture f = b.bind().sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {NettyNioServer server = new NettyNioServer();server.serve(5555);} }

    代碼非常少,而且想要換成阻塞IO,只需要替換Channel里面的工廠類即可:

    public class NettyOioServer {public void serve(int port) throws InterruptedException {final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8")));EventLoopGroup bossGroup = new OioEventLoopGroup(1);EventLoopGroup workerGroup = new OioEventLoopGroup();try{ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//配置boss和worker.channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel....

    概括來說,在Netty中包含下面幾個主要的組件:

    • Bootstrap:netty的組件容器,用于把其他各個部分連接起來;如果是TCP的Server端,則為ServerBootstrap.
    • Channel:代表一個Socket的連接
    • EventLoopGroup:一個Group包含多個EventLoop,可以理解為線程池
    • EventLoop:處理具體的Channel,一個EventLoop可以處理多個Channel
    • ChannelPipeline:每個Channel綁定一個pipeline,在上面注冊處理邏輯handler
    • Handler:具體的對消息或連接的處理,有兩種類型,Inbound和Outbound。分別代表消息接收的處理和消息發送的處理。
    • ChannelFuture:注解回調方法

    了解上面的基本組件后,就看一下幾個重要的內容。

    Netty的Buffer和零拷貝

    在Unix操作系統中,系統底層可以基于mmap實現內核空間和用戶空間的內存映射。但是在Netty中并不是這個意思,它主要來自于下面幾個功能:

  • 通過Composite和slice實現邏輯上的Buffer的組合和拆分,重新維護索引,避免內存拷貝過程。
  • 通過DirectBuffer申請堆外內存,避免用戶空間的拷貝。不過堆外內存的申請和釋放都很麻煩,推薦小心使用。關于堆外內存的一些研究,還可以參考執勤的分享:Java堆外內存之突破JVM枷鎖 以及 Java直接內存與非直接內存性能測試
  • 通過FileRegion包裝FileChannel,直接實現channel到channel的傳輸。
  • 另外,Netty自己封裝實現了ByteBuf,相比于Nio原生的ByteBuffer,API上更易用了;同時支持容量的動態擴容;另外還支持Buffer的池化,高效復用Buffer。

    public class ByteBufTest {public static void main(String[] args) {//創建bytebufByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());System.out.println(buf);// 讀取一個字節buf.readByte();System.out.println(buf);// 讀取一個字節buf.readByte();System.out.println(buf);// 丟棄無用數據buf.discardReadBytes();System.out.println(buf);// 清空buf.clear();System.out.println(buf);// 寫入buf.writeBytes("123".getBytes());System.out.println(buf);buf.markReaderIndex();System.out.println("mark:"+buf);buf.readByte();buf.readByte();System.out.println("read:"+buf);buf.resetReaderIndex();System.out.println("reset:"+buf);} }

    輸出為:

    UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5) reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)

    有興趣的可以看一下上一篇分享的ByteBuffer,對比一下,就能發現在Netty中通過獨立的讀寫索引維護,避免讀寫模式的切換,更加方便了。

    Handler的使用

    前面介紹了Handler包含了Inbound和Outbound兩種,他們統一放在一個雙向鏈表中:

    當接收消息的時候,會從鏈表的表頭開始遍歷,如果是inbound就調用對應的方法;如果發送消息則從鏈表的尾巴開始遍歷。那么上面途中的例子,接收消息就會輸出:

    InboundA --> InboundB --> InboundC

    輸出消息,則會輸出:

    OutboundC --> OutboundB --> OutboundA

    這里有段代碼,可以直接復制下來,試試看:

    package cn.xingoo.book.netty.pipeline;import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset;/*** 注意:** 1 ChannelOutboundHandler要在最后一個Inbound之前**/ public class NettyNioServerHandlerTest {final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));public void serve(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("1",new InboundA());pipeline.addLast("2",new OutboundA());pipeline.addLast("3",new InboundB());pipeline.addLast("4",new OutboundB());pipeline.addLast("5",new OutboundC());pipeline.addLast("6",new InboundC());}});ChannelFuture f = b.bind().sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {NettyNioServerHandlerTest server = new NettyNioServerHandlerTest();server.serve(5555);}private static class InboundA extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);}}private static class InboundB extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);// 從pipeline的尾巴開始找outboundctx.channel().writeAndFlush(buffer);}}private static class InboundC extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);// 這樣會從當前的handler向前找outbound//ctx.writeAndFlush(buffer);}}private static class OutboundA extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundA write");super.write(ctx, msg, promise);}}private static class OutboundB extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundB write");super.write(ctx, msg, promise);}}private static class OutboundC extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundC write");super.write(ctx, msg, promise);}} }

    最后有一個TCP粘包的例子,有興趣的也可以自己試一下,代碼就不貼上來了,可以參考最后面的Github連接。

    參考

  • 《Netty實戰》
  • 《Netty權威指南》
  • github代碼鏈接
  • 轉載于:https://www.cnblogs.com/xing901022/p/8678869.html

    總結

    以上是生活随笔為你收集整理的漫谈Java IO之 Netty与NIO服务器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。