日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty入门篇-从双向通信开始

發布時間:2023/12/10 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty入门篇-从双向通信开始 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

百度百科描述

Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

也就是說,Netty 是一個基于NIO的客戶、服務器端的編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當于簡化和流線化了網絡應用的編程開發過程,例如:基于 TCP 和 UDP 的 socket 服務開發。

如上摘錄自百度百科的描述。


Netty 算是目前最為主流的 NIO 框架了,目前我們也在用 NIO。在 Netty 之前還有另外一個 NIO 框架—Mina,Mina 算是早起的作品,Netty 的基礎架構跟Mina非常相似,使用時的思想也差不多,兩者還有一些微妙的關系,類似于log4j 跟 logback,Netty 和 Mina 均出自 Trustin Lee 之手。

關于Mina跟Netty的區別不是本文重點,我們繼續回到Netty上。

需求場景描述

完成對紅酒窖的室內溫度采集及監控功能。由本地應用程序+溫度傳感器定時采集室內溫度上報至服務器,如果溫度 >20 °C 則由服務器下發重啟空調指令,如果本地應用長時間不上傳溫度給服務器,則給戶主手機發送一條預警短信。

需求是瞎編的,但分析還是要分析的,在沒有接觸socker網絡編程之前我們可能會這么做:你本地寫一個定時器,然后將采集到的溫度數據調一下服務器上的某個接口,服務器拿到數據判斷一下,如果過高則返回一個帶有重啟空調的字段,至于本地斷線的情況,在數據庫維護一個時間字段,如果長時間沒有被更新則調用短信發送接口。

這樣分析起來,感覺也沒啥問題,就是覺得怪怪的,也不能說不行,就是本地設備多了對服務器負載是個問題。

咱先不采用這種方式,上邊描述的場景主要是想模擬雙方通信,實現雙全工操作「客戶端發送數據給服務端,服務端下發指令給客戶端」,一提到雙方通信,我們首先先到的就是Socket吧,來吧,簡單回顧一下Socker通信。

服務端

/*** 服務端*/ public class Server {public static void main(String[] args) {InputStreamReader isr;BufferedReader br;OutputStreamWriter osw;BufferedWriter bw;String str;Scanner in = new Scanner(System.in);try {/* 在本機的 8899 端口開放Server */ServerSocket server = new ServerSocket(8899);/* 只要產生連接,socket便可以代表所連接的那個物體,同時這個server.accept()只有產生了連接才會進行下一步操作。*/Socket socket = server.accept();/* 輸出連接者的IP。*/System.out.println(socket.getInetAddress());System.out.println("建立了一個連接!");while (true) {isr = new InputStreamReader(socket.getInputStream());br = new BufferedReader(isr);System.out.println("客戶端回復:" + br.readLine());osw = new OutputStreamWriter(socket.getOutputStream());bw = new BufferedWriter(osw);System.out.print("服務端回復:");str = in.nextLine();bw.write(str + "\n");bw.flush();}} catch (IOException e) {e.printStackTrace();}} }

簡單看一下 Server 端流程,首先創建了一個ServerSocket來監聽 8899 端口,然后調用阻塞方法 accept();獲取新的連接,當獲取到新的連接之后,然后進入了while循環體,從該連接中讀取數據,讀取數據是以字節流的方式。

客戶端

public class Client {public static void main(String[] args) {InputStreamReader isr;BufferedReader br;OutputStreamWriter osw;BufferedWriter bw;String str;Scanner in = new Scanner(System.in);try {Socket socket = new Socket("127.0.0.1", 8899);System.out.println("成功連接服務器");while (true) {osw = new OutputStreamWriter(socket.getOutputStream());bw = new BufferedWriter(osw);System.out.print("客戶端發送:");str = in.nextLine();bw.write(str + "\n");bw.flush();isr = new InputStreamReader(socket.getInputStream());br = new BufferedReader(isr);System.out.println("服務端回復:" + br.readLine());}} catch (IOException e) {e.printStackTrace();}} }

客戶端連接上服務端 8899 端口之后,進入 while 循環體,從連接中讀取數據,如下是效果圖:

上方代碼為了省事直接在主線程操作了,但即便是將代碼移植到子線程中處理,還是存在大量問題,尤其是 while 死循環。

如果將代碼放在子線程完成,那么一個連接需要一個線程來維護,一個線程包含一個死循環,一萬個線程就包含一萬個死循環… 再就是這些 while 循環并不是每一個都能讀出數據來的,所以就會造成資源浪費,消耗性能。

總之,Socket 就是典型的傳統 IO 模型操作,IO 讀寫是面向流的,一次性只能從流中讀取一個或者多個字節,并且讀完之后流無法再讀取,你需要自己緩存數據。 而 NIO 的讀寫是面向 Buffer 的,你可以隨意讀取里面任何一個字節數據,不需要你自己緩存數據,這一切只需要移動讀寫指針即可,關于 IO 與 NIO 的區別請移步:NIO與IO的區別

Netty實現雙向通信

既然是寫netty,自然是要拿netty來實現上方紅酒窖的案例了,注:簡單的demo,這次咱先實現雙向通信,后續再根據這個系列不斷完善,今天就當入個門了。

開發環境

  • IDEA
  • maven
  • netty版本:4.1.6

首先導入如下 Maven 依賴:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version> </dependency>

服務端:

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyServer {private static NioEventLoopGroup bossGroup = new NioEventLoopGroup();private static NioEventLoopGroup workerGroup = new NioEventLoopGroup();public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數.option(ChannelOption.SO_BACKLOG, 1024)//設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文.childOption(ChannelOption.SO_KEEPALIVE, true)//將小的數據包包裝成更大的幀進行傳送,提高網絡的負載.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());}});serverBootstrap.bind(8070);}@PreDestroypublic void destory() throws InterruptedException {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}

簡單說一下,首先創建了兩個NioEventLoopGroup對象,我們可以把它看做傳統IO模型中的兩大線程組,bossGroup主要用來負責創建新連接「監聽端口,接收新連接的線程組」,workerGroup主要用于讀取數據以及業務邏輯處理「處理每一條連接的數據讀寫的線程組」,再生動一點就是:一個是對外的銷售員,一個是負責單子落地的工人。

然后我們創建了ServerBootstrap,這個類是用來引導我們進行服務端的啟動工作,接收兩個 NioEventLoopGroup 對象,把干活的兩個安排的明明白白。

通過.channel(NioServerSocketChannel.class)來指定 IO 模型,NioServerSocketChannel.class 表示指定的是 NIO,可供的 IO模型 選擇無非就 NIO,BIO,BIO肯定是不能選擇的了。

通過.childOption()可以給每條連接設置一些TCP底層相關的屬性,比如上面,我們設置了兩種TCP屬性,其中

  • ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開啟
  • ChannelOption.TCP_NODELAY表示是否開啟Nagle算法,true表示關閉,false表示開啟,通俗地說,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。

接著,我們調用childHandler()方法,給這個引導類創建一個ChannelInitializer,這里主要就是定義后續每條連接的數據讀寫,業務處理邏輯,不理解沒關系,在后面我們會詳細分析。ChannelInitializer這個類中,我們注意到有一個泛型參數NioSocketChannel,這個類呢,就是 Netty 對 NIO 類型的連接的抽象,而我們前面NioServerSocketChannel也是對 NIO 類型的連接的抽象,NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 編程模型中的ServerSocket以及Socket兩個概念對應上。還沒完,我們需要在ChannelInitializer 中的initChannel() 方法里面給客戶端添加一個邏輯處理器,這個處理器的作用就是負責向服務端寫數據,也就是代碼中的如下部分:

@Override protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler()); }

我們簡單看一下這段代碼,其中ch.pipeline() 返回的是和這條連接相關的邏輯處理鏈,采用了責任鏈模式,類似于之前文章中提到的Spring Security過濾器鏈一樣,這里不理解沒關系,后面再細說。

然后再調用 addLast() 方法 添加一個邏輯處理器,這個邏輯處理器為的就是在客戶端建立連接成功之后,向服務端寫數據,下面是這個邏輯處理器相關的代碼:

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date;public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 1. 獲取數據ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(Charset.forName("utf-8")));System.out.println(new Date() + ": 服務端寫出數據");// 2. 寫數據ByteBuf out = getByteBuf(ctx);ctx.channel().writeAndFlush(out);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {byte[] bytes = "我是發送給客戶端的數據:請重啟冰箱!".getBytes(Charset.forName("utf-8"));ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(bytes);return buffer;}}

繼續如上這段代碼,這個邏輯處理器繼承自 ChannelInboundHandlerAdapter,然后覆蓋了 channelRead()方法,這個方法在接收到客戶端發來的數據之后被回調。

這里的 msg 參數指的就是 Netty 里面數據讀寫的載體,然后需要我們強轉一下為ByteBuf類型,然后調用 byteBuf.toString() 就能夠拿到我們客戶端發過來的字符串數據。

ok,至此服務端創建完了,我們再看客戶端。

客戶端

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.Date; import java.util.concurrent.TimeUnit;public class NettyClient {private static String host = "127.0.0.1";private static int MAX_RETRY = 5;public static void main(String[] args) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap// 1.指定線程模型.group(workerGroup)// 2.指定 IO 類型為 NIO.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)// 3.IO 處理邏輯.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)).addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new NettyClientHandler());}});// 4.建立連接bootstrap.connect(host, 8070).addListener(future -> {if (future.isSuccess()) {System.out.println("連接成功!");} else {System.err.println("連接失敗!");connect(bootstrap, host, 80, MAX_RETRY);}});}/*** 用于失敗重連*/private static void connect(Bootstrap bootstrap, String host, int port, int retry) {bootstrap.connect(host, port).addListener(future -> {if (future.isSuccess()) {System.out.println("連接成功!");} else if (retry == 0) {System.err.println("重試次數已用完,放棄連接!");} else {// 第幾次重連int order = (MAX_RETRY - retry) + 1;// 本次重連的間隔int delay = 1 << order;System.err.println(new Date() + ": 連接失敗,第" + order + "次重連……");bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);}});}}

我們可以看到客戶端的引導類不再是ServerBootstrap了,而是換成了Bootstrap,這個類負責客戶端以及連接服務端,跟服務端屬性大差不差,channel、option、handle等,然后同樣指定了IO模型,同時還增加了連接監聽 bootstrap.connect(host, 8070).addListener,其中future.isSuccess()屬性值表示了連接結果,如果連接失敗則跳入 connect 進行重連,重連嘗試5次之后不再進行嘗試,這塊我們后面文章再細講「其中包含客戶端、服務端的長連接,斷線重試等」,我們先來看看客戶端指定的業務處理類:NettyClientHandler

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date; import java.util.Random;public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(new Date() + ": 客戶端寫出數據");// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}/*** 數據解析*/private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 獲取二進制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();Random random = new Random();double value = random.nextDouble() * 14 + 8;String temp = "獲取室內溫度:" + value;// 2. 準備數據,指定字符串的字符集為 utf-8byte[] bytes = temp.getBytes(Charset.forName("utf-8"));// 3. 填充數據到 ByteBufbuffer.writeBytes(bytes);return buffer;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(new Date() + ": 客戶端讀到數據 -> " + msg.toString());}}

這個邏輯處理器同樣繼承自 ChannelInboundHandlerAdapter,不同于服務端,客戶端邏輯處理器這次使用到了 channelActive()方法,這個方法會在客戶端連接建立成功之后被調用,所以我們在這個方法里完成寫數據的操作「讀取室內溫度」。

我們簡單看一下這個channelActive()方法,首先獲取傳遞的 ByteBuf 對象,這個對象怎么來的呢,我們進入 getByteBuf() 方法,我們可以看到通過調用ctx.alloc() 獲取到一個 ByteBuf ,然后我們把字符串的二進制數據填充進了 ByteBuf,這樣我們就獲取到了 Netty 需要的一個數據格式,最后我們調用 ctx.channel().writeAndFlush() 把數據寫到服務端,至此整個客戶端的寫操作就完成了。

接下來就是讀數據量,channelRead() 方法,這個方法我們在服務端代碼中已經了解過了就不再闡述了。


測試服務端客戶端代碼

首先運行服務端 main() 方法,然后再運行客戶端 main() 方法,執行效果如下:

客戶端

服務端

至此,通過這個小demo,客戶端與服務端可以完成雙向通信了。還不急著技術文章,但畢竟是局域網嗎,如果服務端的代碼部署在外網服務端效果會怎樣呢?

測試服務端在外網服務器

我們把服務端代碼部署在外網環境中試一下看看效果會怎樣。

首先我們修改一下客戶端的host地址為外網ip地址,然后本地起一下客戶端試試:

我們可以看到返回結果沒問題,那說明服務端也是沒問題的:

至此,外網服務端與局域網客戶端的雙向通信時沒問題了,測試具體細節就不展示了,后面章節我會一步一步將代碼遷移到SpringBoot Web項目中的,但是眼下這代碼還是有點問題,我們先在本地繼續完善一下。

本文首發于博客園:https://www.cnblogs.com/niceyoo/p/13269756.html

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下

總結

以上是生活随笔為你收集整理的Netty入门篇-从双向通信开始的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。