日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty原理和使用

發布時間:2025/3/21 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty原理和使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Netty是一個高性能 事件驅動的異步的非堵塞的IO(NIO)框架,用于建立TCP等底層的連接,基于Netty可以建立高性能的Http服務器。支持HTTP、 WebSocket 、Protobuf、 Binary TCP |和UDP,Netty已經被很多高性能項目作為其Socket底層基礎,如HornetQ Infinispan Vert.x
Play Framework Finangle和 Cassandra。其競爭對手是:Apache MINA和 Grizzly。

   傳統堵塞的IO讀取如下:

InputStream is = new FileInputStream("input.bin");
int byte = is.read(); // 當前線程等待結果到達直至錯誤

   而使用NIO如下:

while (true) {
 selector.select(); // 從多個通道請求事件
 Iterator it = selector.selectedKeys().iterator();
 while (it.hasNext()) {
  SelectorKey key = (SelectionKey) it.next();
  handleKey(key);
  it.remove();
 }

堵塞與非堵塞原理

  傳統硬件的堵塞如下,從內存中讀取數據,然后寫到磁盤,而CPU一直等到磁盤寫完成,磁盤的寫操作是慢的,這段時間CPU被堵塞不能發揮效率。

  使用非堵塞的DMA如下圖:CPU只是發出寫操作這樣的指令,做一些初始化工作,DMA具體執行,從內存中讀取數據,然后寫到磁盤,當完成寫后發出一個中斷事件給CPU。這段時間CPU是空閑的,可以做別的事情。這個原理稱為Zero.copy零拷貝。

  Netty底層基于上述Java NIO的零拷貝原理實現:

比較

  • Tomcat是一個Web服務器,它是采取一個請求一個線程,當有1000客戶端時,會耗費很多內存。通常一個線程將花費 256kb到1mb的stack空間。
  • Node.js是一個線程服務于所有請求,在錯誤處理上有限制
  • Netty是一個線程服務于很多請求,如下圖,當從Java NIO獲得一個Selector事件,將激活通道Channel。

演示

Netty的使用代碼如下:

Channel channel = ...
ChannelFuture cf = channel.write(data);
cf.addListener(
  new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture future) throws Exception {
     if(!future.isSuccess() {
        future.cause().printStacktrace();
        ...
     }
     ...
   }
});
...
cf.sync();

通過引入觀察者監聽,當有數據時,將自動激活監聽者中的代碼運行。

我們使用Netty建立一個服務器代碼:

public?class?EchoServer {

????private?final?int?port;

????public?EchoServer(int?port) {?
????????this.port = port;?
??? }

????public?void?run()?throws?Exception {?
??????? // Configure the server.?
??????? EventLoopGroup bossGroup =?new?NioEventLoopGroup();?
??????? EventLoopGroup workerGroup =?new?NioEventLoopGroup();?
????????try?{?
??????????? ServerBootstrap b =?new?ServerBootstrap();?
??????????? b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)?
?????????????????? .handler(new?LoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializer<SocketChannel>() {?
?????????????????????? @Override?
???????????????????????public?void?initChannel(SocketChannel ch)?throws?Exception {?
?????????????????????????? ch.pipeline().addLast(?
?????????????????????????? // new LoggingHandler(LogLevel.INFO),?
???????????????????????????????????new?EchoServerHandler());?
?????????????????????? }?
?????????????????? });

?

??????????? // Start the server.?
??????????? ChannelFuture f = b.bind(port).sync();

??????????? // Wait until the server socket is closed.?
??????????? f.channel().closeFuture().sync();?
??????? }?finally?{?
??????????? // Shut down all event loops to terminate all threads.?
??????????? bossGroup.shutdownGracefully();?
??????????? workerGroup.shutdownGracefully();?
??????? }?
??? }

???
}

這段代碼調用:在9999端口啟動

new?EchoServer(9999).run();

我們需要完成的代碼是EchoServerHandler

public?class?EchoServerHandler?extends?ChannelInboundHandlerAdapter {

????private?static?final?Logger?logger?= Logger.getLogger(EchoServerHandler.class.getName());

?

??? @Override?
????public?void?channelRead(ChannelHandlerContext ctx, Object msg)?throws?Exception {?
??????? ctx.write(msg);?
??? }

??? @Override?
????public?void?channelReadComplete(ChannelHandlerContext ctx)?throws?Exception {?
??????? ctx.flush();?
??? }

??? @Override?
????public?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {?
??????? // Close the connection when an exception is raised.?
????????logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);?
??????? ctx.close();?
??? }?
}

原理

   一個Netty服務器的原理如下:

  圖中每次請求的讀取是通過UpStream來實現,然后激活我們的服務邏輯如EchoServerHandler,而服務器向外寫數據,也就是響應是通過DownStream實現的。每個通道Channel包含一對UpStream和DownStream,以及我們的handlers(EchoServerHandler),如下圖,這些都是通過channel pipeline封裝起來的,數據流在管道里流動,每個Socket對應一個ChannelPipeline。

?

   CHANNELPIPELINE是關鍵,它類似Unix的管道,有以下作用:

  • 為每個Channel 保留 ChannelHandlers ,如EchoServerHandler
  • 所有的事件都要通過它
  • 不斷地修改:類似unix的SH管道: echo "Netty is shit...." | sed -e 's/is /is the /'
  • 一個Channel對應一個 ChannelPipeline
  • 包含協議編碼解碼 安全驗證SSL/TLS和應用邏輯

?

客戶端代碼

  前面我們演示了服務器端代碼,下面是客戶端代碼:

public?class?EchoClient {?
????private?final?String host;?
????private?final?int?port;?
????private?final?int?firstMessageSize;

????public?EchoClient(String host,?int?port,?int?firstMessageSize) {?
????????this.host = host;?
????????this.port = port;?
????????this.firstMessageSize = firstMessageSize;?
??? }

?

????public?void?run()?throws?Exception {?
??????? // Configure the client.?
??????? EventLoopGroup group =?new?NioEventLoopGroup();?
????????try?{?
??????????? Bootstrap b =?new?Bootstrap();?
??????? ?? b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?true).handler(new?ChannelInitializer<SocketChannel>() {?
??????????????? @Override?
????????????????public?void?initChannel(SocketChannel ch)?throws?Exception {?
?????????????????? ch.pipeline().addLast(?
?????????????????? // new LoggingHandler(LogLevel.INFO),?
???????????????????????????new?EchoClientHandler(firstMessageSize));?
??????????????? }?
??????????? });

??????????? // Start the client.?
??????????? ChannelFuture f = b.connect(host, port).sync();

??????????? // Wait until the connection is closed.?
??????????? f.channel().closeFuture().sync();?
??????? }?finally?{?
??????????? // Shut down the event loop to terminate all threads.?
??????????? group.shutdownGracefully();?
??????? }?
??? }?
}

客戶端的應用邏輯EchoClientHandler

public?class?EchoClientHandler?extends?ChannelInboundHandlerAdapter {

????private?static?final?Logger?logger?= Logger.getLogger(EchoClientHandler.class.getName());

????private?final?ByteBuf firstMessage;

??? /**?
??? ?* Creates a client-side handler.?
??? ?*/?
????public?EchoClientHandler(int?firstMessageSize) {?
????????if?(firstMessageSize <= 0) {?
????????????throw?new?IllegalArgumentException("firstMessageSize: " + firstMessageSize);?
??????? }?
??????? firstMessage = Unpooled.buffer(firstMessageSize);?
????????for?(int?i = 0; i < firstMessage.capacity(); i++) {?
??????????? firstMessage.writeByte((byte) i);?
??????? }?
??? }

??? @Override?
????public?void?channelActive(ChannelHandlerContext ctx) {?
??????? ctx.writeAndFlush(firstMessage);?
??????? System.out.print("active");?
??? }

??? @Override?
????public?void?channelRead(ChannelHandlerContext ctx, Object msg)?throws?Exception {?
??????? ctx.write(msg);?
??????? System.out.print("read");?
??? }

??? @Override?
????public?void?channelReadComplete(ChannelHandlerContext ctx)?throws?Exception {?
??????? ctx.flush();?
??????? System.out.print("readok");?
??? }

??? @Override?
????public?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {?
??????? // Close the connection when an exception is raised.?
????????logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);?
??????? ctx.close();?
??? }

}

?

from:?https://www.jdon.com/concurrent/netty.html

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Netty原理和使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。