NIO基础
non-blocking io 非阻塞IO
1.三大組件
1.1Channel&Buffer
Channel:雙向通道,可讀可寫? ?Buffer:緩沖區(qū)
1.2 Selector
來1000個socket,要1000個線程服務(wù)。損耗太大。
阻塞模式的含義:thread處理socket1的時候就不能處理socket3(eg:服務(wù)器服務(wù)第一個客人點菜,買單的過程不能服務(wù)下一個客人)
所以適合短連接,早點處理完去服務(wù)下一個線程
selector版設(shè)計
selector監(jiān)視這些channel要求,有人有處理需求就去處理
和線程池版的區(qū)別就是線程不會吊死在一個連接上,可以服務(wù)多個請求,線程利用率就提高了很多。
所以適合連接數(shù)多,但是流量低 的情況,一個channel流量多的話就不能讓線程很好的服務(wù)其他的channel。
2.ByteBuffer
public class TestByteBuffer {@Testpublic void test(){//FileChannel//1.輸入輸出流 2.RandomAccessFiletry(FileChannel channel = new FileInputStream("data").getChannel()){//讀取需要一個緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(10);while(true){//從channel讀取數(shù)據(jù),向buffer寫入int len = channel.read(buffer);//Log.debug("讀取到的字節(jié)數(shù){}",len);if(len==-1){//沒有內(nèi)容了break;}//打印buffer內(nèi)容buffer.flip();//切換至讀模式while(buffer.hasRemaining()){//是否還有剩余未讀數(shù)據(jù)byte b = buffer.get();System.out.println((char) b);}buffer.clear();//切換為寫模式}}catch (IOException ex){System.out.println(ex.toString());}} }2.2ByteBuffer結(jié)構(gòu)
capacity:容量
position:索引? 寫入位置/讀取位置
limit:寫入/讀取多少內(nèi)容
調(diào)用flip會改兩個指針的位置,position從寫位置轉(zhuǎn)到讀的位置,limit從寫的限制變?yōu)樽x的限制
compact:沒讀完但是現(xiàn)在要馬上寫,compact會把后面沒讀的移到起始位置,寫只能從沒讀那些位置后面開始寫。
package com.netty.demo;import org.junit.jupiter.api.Test;import java.nio.ByteBuffer; import static com.netty.demo.ByteBufferUtil.debugAll;public class TestBufferReadWrite {@Testpublic static void main(String[] args){ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte)0x61);//'a'debugAll(buffer);} }?
顯示限制是10,現(xiàn)在可以從位置1開始寫
如果不切換讀模式,直接讀,那么會直接讀當前位置也就是position=1,讀到的會是0
package com.netty.demo;import org.junit.jupiter.api.Test;import java.nio.ByteBuffer; import static com.netty.demo.ByteBufferUtil.debugAll;public class TestBufferReadWrite {@Testpublic static void main(String[] args){ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte)0x61);//'a'debugAll(buffer);System.out.println( buffer.get());buffer.flip();System.out.println(buffer.get());} }2.3ByteBuffer常見方法
put,get,clean,字符串與bytebuffer幾種轉(zhuǎn)換方法
2.4 Scattering Reads
分散讀集中寫
粘包半包分析
package com.netty.demo;import org.junit.jupiter.api.Test;import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Random;import static com.netty.demo.ByteBufferUtil.debugAll;public class TestScatteringReads {private static void split(ByteBuffer source){source.flip();for(int i=0;i<source.limit();i++){if(source.get(i)=='\n'){int length = i+1-source.position();ByteBuffer target = ByteBuffer.allocate(length);//從source讀,向target寫for(int j=0;j<length;j++){target.put(source.get());}debugAll(target);}}source.compact();//第一次未讀的部分向前移動}@Testpublic static void main(String[] args){ByteBuffer source = ByteBuffer.allocate(32);source.put("hello,world\nI'm zhangsan\nHo".getBytes());split(source);source.put("w are you?\n".getBytes());split(source);} }3. 文件編程
3.1 FileChannel
FileChannel只能工作在阻塞模式下
data中文件寫到to文件
package com.netty.demo;import org.junit.jupiter.api.Test;import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.channels.FileChannel;public class TestFileChannelTransferTo {@Testpublic static void main(String[] args){try{FileChannel from = new FileInputStream("data").getChannel();FileChannel to = new FileOutputStream("to").getChannel();//效率高,底層會使用操作系統(tǒng)的零拷貝進行優(yōu)化from.transferTo(0,from.size(),to);}catch (Exception ex){}} }3.2 傳輸數(shù)據(jù)大于2g時,
可以多次使用FileChannel進行傳輸,
package com.netty.demo;import org.junit.jupiter.api.Test;import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.channels.FileChannel;public class TestFileChannelTransferTo {@Testpublic static void main(String[] args){try{FileChannel from = new FileInputStream("data").getChannel();FileChannel to = new FileOutputStream("to").getChannel();//效率高,底層會使用操作系統(tǒng)的零拷貝進行優(yōu)化long size = from.size();//left表示每次傳輸完剩余的字節(jié)數(shù)量for(long left = size;left>0;){left-=from.transferTo(size-left,size,to);}from.transferTo(0,from.size(),to);}catch (Exception ex){}} }3.3 Path
3.4 Files
遍歷文件目錄
package com.netty.demo;import org.junit.jupiter.api.Test;import java.io.IOException; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; import java.util.concurrent.atomic.AtomicInteger;public class TestWakFileTree {@Testpublic static void main(String[] args) throws IOException {Path path = Paths.get("D:\\many tools\\JDK");// 文件目錄數(shù)目AtomicInteger dirCount = new AtomicInteger();// 文件數(shù)目AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("===>"+dir);// 增加文件目錄數(shù)dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);// 增加文件數(shù)fileCount.incrementAndGet();return super.visitFile(file, attrs);}});// 打印數(shù)目System.out.println("文件目錄數(shù):"+dirCount.get());System.out.println("文件數(shù):"+fileCount.get());} }4. 網(wǎng)絡(luò)編程
package com.netty.demo;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List;import static com.netty.demo.ByteBufferUtil.debugRead;public class Server {private static Logger logger = LoggerFactory.getLogger(Server.class);public static void main(String[] args) throws IOException {//使用nio來理解阻塞模式,單線程ByteBuffer buffer = ByteBuffer.allocate(16);//1.創(chuàng)建了服務(wù)器ServerSocketChannel ssc = ServerSocketChannel.open();//2.綁定監(jiān)聽端口ssc.bind(new InetSocketAddress(8080));//3.連接集合List<SocketChannel> channels = new ArrayList<>();while(true){//4.accept建立與客戶端連接 SocketChannel用來與客戶端通信logger.debug("connecting...");SocketChannel sc = ssc.accept();//accept是阻塞方法,會讓線程暫停logger.debug("connection...{}",sc);channels.add(sc);for(SocketChannel channel:channels){//5.接受客戶端發(fā)送的數(shù)據(jù)logger.debug("before read... {}",channel);channel.read(buffer);buffer.flip();debugRead(buffer);buffer.clear();logger.debug("after read...{}",channel);}}} } package com.netty.demo;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));System.out.println("waiting...");} }處理消息的邊界
服務(wù)器一次發(fā)送過多數(shù)據(jù)
服務(wù)端代碼:
package com.netty.demo;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator;public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(true){// 若沒有事件就緒,線程會被阻塞,反之不會被阻塞。從而避免了CPU空轉(zhuǎn)selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);//1.向客戶端發(fā)送大量數(shù)據(jù)StringBuilder sb = new StringBuilder();for(int i=0;i<30000000;i++){sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());while (buffer.hasRemaining()){//2.返回值代表實際寫入的字節(jié)數(shù)int write = sc.write(buffer);System.out.println(write);}}}}} }客戶端代碼:
package com.netty.demo;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class WriteClient {public static void main(String[] args) throws IOException{SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));//3.接收數(shù)據(jù)int count=0;while (true){ByteBuffer buffer = ByteBuffer.allocate(1024*1024);count+=sc.read(buffer);System.out.println(count);buffer.clear();}} }客戶端分多次接收,但是服務(wù)端需要一直輪詢,查詢緩沖區(qū)是不是滿的,還能不能繼續(xù)寫進數(shù)據(jù)
下面用可寫事件來處理,避免輪詢
package com.netty.demo;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator;public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(true){// 若沒有事件就緒,線程會被阻塞,反之不會被阻塞。從而避免了CPU空轉(zhuǎn)selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey scKey = sc.register(selector,0,null);//1.向客戶端發(fā)送大量數(shù)據(jù)StringBuilder sb = new StringBuilder();for(int i=0;i<30000000;i++){sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());//2.返回值代表實際寫入的字節(jié)數(shù)int write = sc.write(buffer);System.out.println(write);//3.判斷是否有剩余內(nèi)容if (buffer.hasRemaining()){//4.關(guān)注可寫事件//scKey可能原來關(guān)注了其他事件,比如讀事件//不想覆蓋掉他,就+scKey.interestOps()scKey.interestOps(scKey.interestOps()+SelectionKey.OP_WRITE);//5.把未寫完的數(shù)據(jù)掛在scKey上scKey.attach(buffer);}}else if(key.isWritable()){ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);System.out.println(write);//6.清理一下,寫完了buffer一直掛著,消耗大if(!buffer.hasRemaining()){key.attach(null);key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);}}}}} }小結(jié):
總結(jié)
- 上一篇: Leetcode--671. 合并二叉树
- 下一篇: Leetcode--17.电话号码的字母