AIO(异步IO)
前言
AIO是異步IO的縮寫,即Asynchronized IO。雖然NIO在網絡操作中,提供了非阻塞的方法,但是NIO的IO行為還是同步的,對于NIO來說,我們的業務線程是在IO操作準備好時,得到通知,接著就由這個線程自行進行IO操作,IO操作本身還是同步的。
但是對于AIO來說,則更加的進了一步,它不是在IO準備好時再通知線程,而是在IO操作已經完成后,再給線程發出通知。因此,AIO是完全不會阻塞的。此時,我們的業務邏輯將變成一個回調函數,等待IO操作完成后,由系統自動觸發。
NIO和AIO的使用場景
NIO方式適用于連接數目多且連接比較短(輕操作)的架構,比如聊天服務器,并發局限于應用中,JDK1.4開始支持。
AIO方式使用于連接數目多且連接比較長(重操作)的架構,比如HTTP服務器等,充分調用OS參與并發操作,JDK7開始支持
下面來通過AIO實現的服務器來加深了解AIO:
AIOEchoServer:
1 public class AIOEchoServer { 2 public static final int PORT = 8000; 3 private AsynchronousServerSocketChannel server;//異步通道 4 5 public AIOEchoServer() throws IOException { 6 server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT)); 7 } 8 9 //接收和處理 10 public void start(){ 11 System.out.println("Server listen on " + PORT); 12 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { 13 final ByteBuffer buffer = ByteBuffer.allocate(1024); 14 public void completed(AsynchronousSocketChannel result,Object attachment){ 15 System.out.println(Thread.currentThread().getName()); 16 Future<Integer> writeResult = null; 17 buffer.clear(); 18 try { 19 result.read(buffer).get(100, TimeUnit.SECONDS); 20 buffer.flip(); 21 writeResult = result.write(buffer); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } catch (ExecutionException e) { 25 e.printStackTrace(); 26 } catch (TimeoutException e) { 27 e.printStackTrace(); 28 }finally { 29 30 server.accept(null,this); 31 try { 32 writeResult.get(); 33 result.close(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } catch (ExecutionException e) { 37 e.printStackTrace(); 38 } catch (IOException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 @Override 44 public void failed(Throwable exc, Object attachment) { 45 System.out.println("failed : " + exc); 46 } 47 }); 48 }49 }
? 異步IO(AIO)需要使用異步通道。這里使用的是AsynchronousServerSocketChannel。
上述代碼定義的start()方法開啟了服務器,值得注意的是,這里只是調用了一個函數server.accept()。之后,這一大堆的代碼只是這個函數的參數。
AsynchronousServerSocketChannel.accept()方法會立即返回。它并不會真的等待客戶端的到來,這里使用的accept()方法的簽名是:
public final <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler)它的第一個參數是一個附件,可以是任意類型,作用是讓當前線程和后續的回調方法可以共享這個信息,它會在后續的調用中,傳遞給handler。它的第二個參數是CompletionHandler接口。這個接口有兩個方法:
void completed(V result,A attachment)void failed(Throwable exc,A attachment)這兩個方法分別在異步操作accept()成功調用completed()和失敗調用failed()。
因此,AsynchronousServerSocketChannel.accept()實際上做了兩件事,第一就是發起accept請求,告訴系統可以開始監聽端口了。第二,注冊CompletionHandler實例,告訴系統,一旦有客戶端前來連接,如果連接成功,就去執行CompletionHandler.completed()方法;如果連接失敗,就去執行CompletionHandler.failed()方法。
所以,server.accept()方法不會阻塞,它會立即返回。
到這里,上述代碼的意思其實也就差不多明白了:當completed()被執行時,意味著已經有客戶端連接成功了。在第19行,使用read()方法讀取客戶端的數據,這里需要注意,AsynchronousServerSocketChannel.read()方法也是異步的,換句話說,就是它不會等到數據讀取完成了再返回,而是立即返回,返回的結果是一個Future對象,因此這里是Future模式的典型應用。在這里為了編程方便,直接調用了Future.get()方法(第32行),進行等待,將這個異步方法變為了同步方法。因此,在19行執行完成后,數據讀取就已經完成了。
之后,將數據回寫給客戶端(第21行),這里調用的是AsynchronousServerSocketChannel.write()方法,這個方法也是異步的,同樣的返回一個Future對象。
再之后,第30行,服務器進行下一個客戶端的連接準備。同時關閉當前正在處理的客戶端連接。但是在關閉之前,得先確認之前的write()操作已經完成,因此,使用Future.get()方法進行等待(第32行)。
接下來,我們只需要在main函數中調用這個start()方法就可以開啟服務器了:
1 public static void main(String[] args) throws IOException, InterruptedException { 2 new AIOEchoServer().start(); 3 while (true){ 4 Thread.sleep(1000); 5 } 6 }上述代碼第2行,調用start()方法開啟服務器。但是由于start()方法中使用的是異步方法,因此它會立即返回,它并不會像阻塞方法那樣會進行等待,因此,如果想讓程序駐守執行,第3~5行的等待語句是必須的。否則,在start()方法結束后,不等客戶端到來,程序就已經運行完成,主線程就將退出。
AIOClient:
1 public class AIOClient { 2 public static void main(String[] args) throws IOException, InterruptedException { 3 final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); 4 client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() { 5 @Override 6 public void completed(Void result, Object attachment) { 7 client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() { 8 @Override 9 public void completed(Integer result, Object attachment) { 10 ByteBuffer buffer = ByteBuffer.allocate(1024); 11 client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { 12 @Override 13 public void completed(Integer result, ByteBuffer attachment) { 14 buffer.flip(); 15 System.out.println(new String(buffer.array())); 16 try { 17 client.close(); 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 } 22 @Override 23 public void failed(Throwable exc, ByteBuffer attachment) { 24 } 25 }); 26 } 27 @Override 28 public void failed(Throwable exc, Object attachment) { 29 } 30 }); 31 } 32 @Override 33 public void failed(Throwable exc, Object attachment) { 34 } 35 }); 36 //由于主線程會立即結束,所以這里等待上述處理全部完成 37 Thread.sleep(1000); 38 } 39 }上述的AIOClient代碼看起來很長,實際上只有三個語句:
第一個語句:代碼第3行,打開AsynchronousSocketChannel通道。
第二個語句:代碼第4~35行,它讓客戶端去連接指定的服務器,并注冊了一系列事件。
第三個語句:代碼第37行,讓主線程進行等待。
代碼的第4行,客戶端進行網絡連接,并注冊了連接成功的回調函數CompletionHandler<Void,Object>。待連接成功后,就會進入代碼第7行。第7行進行數據寫入,向服務端發送數據。這個過程是異步的,會很快返回,寫入完成后,會通知回調接口CompletionHandler<Integer,Object>,進入第10行。準備進行數據讀取,從服務端讀取回寫的數據。當然代碼的第11行的read()方法也是立即返回的,成功讀取所有的數據后,會回調CompletionHandler<Integer,ByteBuffer>接口,進入第14行。在第15行,打印接收到的數據。
AIO的特點
1.?讀完了再通知我
2.?不會加快IO,只是在讀完后進行通知
3.?使用回調函數,進行業務處理
參考:《Java高并發程序設計》 葛一鳴 郭超 編著:
轉載于:https://www.cnblogs.com/Joe-Go/p/9987894.html
總結
- 上一篇: 每日站立会议(第六天)
- 下一篇: NLP AI