日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据 NIO

發布時間:2024/4/30 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据 NIO 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

NIO

一、基礎回顧

a 、 進程與線程

  • 進程
  • 進程: 程序加載到內存中之后被CPU所計算的過程 — 進程是計算機資源分配 、 任務調度的最小單位
  • 三個維度考慮進程:
  • 物理內存維度:每一個進程都要分配一塊連續的內存空間(首地址 、 尾地址)
  • 進程執行維度/邏輯維度: 每一個進程都能被CPU計算 , 每一個進程都能掛起然后讓另外的進程被CPU計算 — 對于單核CPU而言 , 每一個時刻只能計算一個進程 。 對于windows而言 , (即使是多核CPU)默認只用一個核處理 。 對于Linux而言 , 有幾個核就用幾個。 — 從微觀上 , 計算機是串行處理進程 。 從宏觀上而言 , 是多個進程來并行執行 。 — 引入多道編程
  • 時間維度: 在每一個時間段內 , 進程一定是向前撲進的 。
  • 為什么要引入進程模型?
  • 減少程序響應時間 , 提高使用效率 。
  • 提高CPU利用率 。 IO事件(程序與硬件之間的交互)
  • 進程的產生事件:
  • 系統啟動時 , 會創建系統進程 。
  • 用戶請求創建進程時 , 創建用戶進程 。 (打開應用)
  • 主進程自動啟動子進程 。 (QQ等程序啟動之后 , 會自動啟動安全守護進程 。 )
  • 進程的消亡事件
  • 進程任務執行完畢 , 自動銷毀
  • 進程執行過程中出現錯誤或異常, 導致進程退出 。 意外身亡
  • 一個進程被另外的進程強制關閉 他殺
  • 進程的狀態 (除啟動 、 消亡)
  • 就緒 就緒->運行
  • 運行 運行->阻塞 運行->就緒
  • 阻塞 阻塞->就緒
  • 線程
  • 線程: 是進程中執行某一個具體的任務 。 線程本質上是簡化版的進程 。 線程不具有進程資源分配 、 任務調度的資格 。 一個進程中至少有一個線程是在執行的 。 — 線程是任務執行的最小單位 。
  • 進程的任務調度算法
  • 時間片輪轉算法
  • 優先級調度算法
  • 短任務優先算法
  • FICS 先來先執行
  • b、 Socket

  • BIO BlockingIO— 阻塞式IO — 阻塞在一些場景下會相對影響效率 ; 由于流具有方向性, 所以在傳輸數據時往往要創建多個流對象。 如果一些流長時間不使用 , 卻依然保持連接, 會造成資源的大量浪費 。 無法從流中準確的抽取一段數據
  • 引入 NIO
  • 二、 NIO

  • NIO — NewIO — NonBlockingIO — 非阻塞式IO — 基于通道和緩沖區。
  • Buffer類 — 一個基本數據類型的容器 。 緩沖區
  • Buffer子類 ByteBuffer

  • 底層是依靠字節數組來存儲數據 理解 容量位capacity 、 position 操作位 、 limit限制位
  • 在讀取數據之前往往要做一次filp操作 , 反轉緩沖區 , 先將限制位設置為操作位 , 然后將操作位歸0
  • 重繞緩沖區 – 將操作位歸0 , 限制位不變
  • 代碼1

    import java.nio.ByteBuffer;public class BufferDemo {public static void main(String[] args) { // //創建緩沖區 , 并指定了大小為1024個字節//當創建好緩沖區的時候 , 就有了一下屬性//1. capacity 容量位 --- 表示緩沖區容量//2. position 操作位 --- 表示要操作的位置 ---- 當緩沖區剛剛創建的時候 , 操作位默認為0 , 每添加一個字節的數據 , position就會向后挪一位//3. limit 限制位 ---- 表示position 所能達到的最大位置 --- 當緩沖區剛剛創建的時候 , limit就是容量位 。//獲取數據時 , 默認是從操作位開始獲取的 // ByteBuffer buffer = ByteBuffer.allocate(1024);//最多能存放1k數據 // //向緩沖區添加數據 // buffer.put("hello".getBytes());//以上方法存在資源浪費//******************************************************* // //在已知具體數據的情況下 , 建議使用這種方法創建緩沖區//使用wrap方式創建緩沖區 , 參數實際上是一個字節數組 , 底層實際上就是將參數字節數組復制給底層的實際存儲數據的數組 , 此時操作位并沒有改變還是0 //為什么是數組使用復制 , 而不是直接使用賦值?//保持數據的不變和唯一 // ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());//創建與數據大小相對應的緩沖區 // // //獲取數據 , 每一次獲取 , 只能獲取一個字節byte b = buffer.get();System.out.println(b); // // //獲取緩沖區所有數據 // while(buffer.hasRemaining()) {//判斷是否還有剩余數據 // // byte b = buffer.get(); // System.out.println(b); // }//*******************************************************//但是使用固定緩沖區大小的情況下獲取數據會出現獲取到0的情況 , 需要將默認的操作位歸0 , 并且讀取到有效數據結束即可 ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());//遍歷方法一 : 記錄操作位位置后循環遍歷 // int position = buffer.position(); // for(int i = 0 ;i < position ; i++) { // System.out.println(buffer.get(i)); // }//遍歷方法二: 設置限制位為操作位后 , 操作位歸0 遍歷 // buffer.limit(buffer.position()); // buffer.position(0); // while(buffer.hasRemaining()) { // System.out.println(buffer.get()); // }//遍歷方法三: 反轉緩沖區//先將限制位設置為當前的操作位 , 然后把操作位歸0buffer.flip(); // buffer.hasRemaining()該方法 本質上就是判斷操作位是否小于限制位while(buffer.hasRemaining()) {System.out.println(buffer.get());}//獲取緩沖區中的底層數組byte[] array = buffer.array();//底層也是使用的數組復制 , 返回的是整個底層數組 , 而不是有效數據System.out.println(new String(array , 0 , buffer.position()));//如果使用過反轉buffer.flip();System.out.println(new String(array , 0 , buffer.limit()));} }
  • 代碼2

    import java.nio.ByteBuffer;public class BufferDemo2 {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit()); // buffer.flip(); // System.out.println("操作位:"+ buffer.position()); // System.out.println("限制位:"+ buffer.limit());//重繞緩沖區buffer.rewind(); //作用: 將操作位歸0 , 限制位不變 。 System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit());} }
  • Channel 通道

  • SocketChannel

  • 客戶端步驟
  • 打開客戶端通道 – open
  • 將客戶端通道設置為非阻塞
  • 發起連接
  • 人為阻塞 , 防止產生無效連接 finishConnect()
  • 寫出數據
  • 服務器端步驟
  • 打開服務器端通道
  • 綁定偵聽的端口號
  • 設置為非阻塞
  • 接收連接
  • 人為阻塞 , 防止沒有獲取真正的連接
  • 讀取數據
  • 代碼示例:

    客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class SocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開通道//SocketChannel 默認為阻塞連接 此時和Socket基本一樣SocketChannel s = SocketChannel.open();//設置SoceketChannel為非阻塞的s.configureBlocking(false);//發起連接s.connect(new InetSocketAddress("localhost", 8090));//由于SoceketChannel為非阻塞的 , 所以不能保證連接的真正建立//在實際開發中往往會認為的設置阻塞 , 來保證連接的建立//判斷連接是否成功 , 如果沒有連接成功finishConnect()底層會試圖再次建立連接//如果多次試圖連接沒有成功 , 則報錯while(!s.finishConnect()) ;//寫出數據s.write(ByteBuffer.wrap("hello".getBytes()));//獲取服務器端的響應Thread.sleep(100);ByteBuffer b = ByteBuffer.allocate(100);s.read(b);b.flip();System.out.println(new String(b.array() , 0 , b.limit())); // 關閉通道 s.close();}服務端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;public class ServerSocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開服務器通道ServerSocketChannel s= ServerSocketChannel.open();//綁定偵聽的端口s.bind(new InetSocketAddress( 8090));//設置非阻塞s.configureBlocking(false);//接收連接SocketChannel accept = s.accept();//由于ServerSocketChannel是非阻塞的 , 所以可能出現還沒有客戶端聯入 但是服務器已經結束的現象//所以需要人為的設置為阻塞的 。 while(accept == null) {accept = s.accept();}//將socketChannel設置為非阻塞accept.configureBlocking(false);//讀取數據ByteBuffer buffer = ByteBuffer.allocate(100);accept.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//向客戶端做出響應accept.write(ByteBuffer.wrap("服務器端接收成功!".getBytes()));Thread.sleep(1000);//如果不加延時 , 服務器端寫出數據立即結束 , 此時客戶端還沒有接收完數據會報錯} }
  • 通道特點 :

  • 能夠進行數據的雙向傳輸 , 減少流的數量 , 降低服務器的內存消耗
  • 由于數據時存儲在緩沖區的 , 所以我們可以根據緩沖區的數據做定向操作 **
  • 能夠利用一個或者少量的服務器來完成大量的用戶的請求處理(一個服務器能夠接受多個客戶端的請求) — NIO適用于短任務場景 , BIO適用于長任務場景
  • Selector 選擇器

  • 每一個客戶端 或者服務器端都需要注冊到選擇器上 , 讓這個選擇器進行管理 , 選擇器管理的時候需要監聽事件:
  • 可連接事件 — 一般是客戶端
  • 可接受事件 — 一般是服務器端
  • 可讀事件
  • 可寫事件
  • 代碼:

    客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class ClientDemo {public static void main(String[] args) throws IOException {//打開客戶端的通道SocketChannel sc = SocketChannel.open();//設置為非阻塞sc.configureBlocking(false);//獲取選擇器Selector selc = Selector.open();//將通道注冊到選擇器上sc.register(selc, SelectionKey.OP_CONNECT);//并給予連接權限//發起連接sc.connect(new InetSocketAddress("localhost", 8080));while(true) {//進行選擇 , 篩選出有用的連接selc.select();//獲取篩選之后有用的事件Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while(iterator.hasNext()) {//將遍歷到的事件讀取出來SelectionKey next = iterator.next();//可能向服務器發起連接//可能向服務器寫數據//可能接收服務器的數據if(next.isConnectable()) {//判斷是否是一個連接事件//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//判斷之前的連接是否成功while(!scx.finishConnect());//連接成功之后 進行讀寫操作scx.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(next.isWritable()) {//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//寫數據scx.write(ByteBuffer.wrap("讀取數據成功!".getBytes()));//執行完寫操作之后 , 需要將這個通道的寫權限注銷掉 ,防止不停地向服務器寫數據scx.register(selc, next.interestOps() ^ SelectionKey.OP_WRITE);//可用^ 或& ~}if(next.isReadable()) {//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//讀數據ByteBuffer buffer = ByteBuffer.allocate(100);scx.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//移除可讀事件scx.register(selc, next.interestOps() & ~SelectionKey.OP_READ);}//為了防止事件移除失敗 , 處理完成后將事件移除iterator.remove();}}} }服務器端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;import javax.swing.plaf.SliderUI;public class ServerDemo {public static void main(String[] args) throws IOException {//打開服務器端通道ServerSocketChannel ssc = ServerSocketChannel.open();//綁定偵聽的端口號ssc.bind(new InetSocketAddress(8080));//接收任何IP客戶端8080端口傳來的數據//將通道設置為非阻塞ssc.configureBlocking(false);//將服務器注冊到選擇器上Selector selc = Selector.open();//為服務器注冊一個接受請求的權限ssc.register(selc, SelectionKey.OP_ACCEPT);while(true) {//進行選擇selc.select();//將選擇后的事件獲取出來Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {//獲取這個事件SelectionKey key = it.next();//可能是接受連接事件//可能是可讀事件//可能是可寫事件if(key.isAcceptable()) {//獲取事件的通道ServerSocketChannel sscx = (ServerSocketChannel) key.channel();//接受連接SocketChannel sc = sscx.accept();while(sc == null) {sscx.accept();}//設置為非阻塞sc.configureBlocking(false);//注冊一個可讀事件sc.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(key.isReadable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//讀取數據ByteBuffer buffer = ByteBuffer.allocate(100); scx.read(buffer);buffer.flip();System.out.println(new String (buffer.array() , 0 , buffer.limit()));//消除可讀事件scx.register(selc, key.interestOps() ^ SelectionKey.OP_READ);}if(key.isWritable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//寫出數據scx.write(ByteBuffer.wrap("hello".getBytes()));//消除可以寫事件scx.register(selc, key.interestOps() & ~SelectionKey.OP_WRITE);}it.remove();}}} }
  • 三 、 考慮 : 數據粘包怎么處理?

  • 數據定長 — 如果數據長度不夠 , 填充無用數據 — 怎樣區分無用數據
  • 約定數據結尾符號 — 結尾符號可能會和實際的數據內容沖突
  • 約定協議 — 序列化/反序列化 — 底層實際上是約束了起始和結束的協議
  • 總結

    以上是生活随笔為你收集整理的大数据 NIO的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。