NIO和Reactor
本文參考Doug Lea的Scalable IO in Java.
網(wǎng)絡(luò)服務(wù)
隨著網(wǎng)絡(luò)服務(wù)的越來越多,我們對網(wǎng)絡(luò)服務(wù)的性能有了更高的要求,提供一個高性能,穩(wěn)定的web服務(wù)是一件很麻煩的事情,所以有了netty框架幫我們完成。
我們對各種各樣的網(wǎng)絡(luò)服務(wù)進(jìn)行抽象,得到最基本的業(yè)務(wù)流程:
1:讀取請求信息
2:對請求信息進(jìn)行解碼
3:進(jìn)行相關(guān)的業(yè)務(wù)處理
4:對處理結(jié)果進(jìn)行編碼
5:發(fā)送請求
看到這,netty的ChannelHandler就是干這幾件事情的。
傳統(tǒng)的網(wǎng)絡(luò)服務(wù)
在jdk1.4之前,我們只有BIO,所以當(dāng)時的網(wǎng)絡(luò)服務(wù)的架構(gòu)是這樣的。
每個線程處理一個請求, 由于線程個數(shù)和cpu個數(shù)的原因,這種設(shè)計性能是有上限的,所以就有了集群模式:tomcat集群。
/*** Created by gaoxing on 2015-01-20 .*/ public class ClasssicServer {public static void main(String[] args) {try {ServerSocket serverSocket=new ServerSocket(8888,10);System.out.println("server is start");while( ! Thread.interrupted()){new Thread(new Handler(serverSocket.accept())).start();}} catch (IOException e) {e.printStackTrace();}}static class Handler implements Runnable{final Socket socket;final static int MAX_SIZE=1024;public Handler(Socket socket){this.socket=socket;}@Overridepublic void run() {//TODO//在這里對socket中的數(shù)據(jù)進(jìn)行讀取和處理,然后把結(jié)果寫入到socket中去 }} }高性能的IO目標(biāo)和Reactor
1:高負(fù)載下可以穩(wěn)定的工作
2:提高資源的利用率
3:低延遲
這有就有了分而治之和事件驅(qū)動的思想。這樣沒有thread就不用瞎跑了,cpu就不用不停的切換Thread . 這樣提出了Reactor模式
1:Reactor接收IO事件發(fā)送給該事件的處理器處理
2:處理器的操作是非阻塞的。
3:管理事件和處理器的綁定。
?
?
這是一個單線程版本,所有的請求都是一個線程處理,當(dāng)然Reactor不是無緣無故的提出來的,因為jdk提供了nio包,nio使得Reator得于實現(xiàn)
1:channels: 通道就是數(shù)據(jù)源的抽象,通過它可以無阻塞的讀取數(shù)據(jù)
2:buffers? : 用來裝載數(shù)據(jù)的,可以把從channel讀取到buffer中,或者把buffer中的數(shù)據(jù)寫入到channel中
3:selector :?用來監(jiān)聽 有IO事件,并告訴channel
4:selectionkeys:? IO事件和處理器綁定
/*** Created by gaoxing on 2015-01-20 .*/ public class Reactor implements Runnable {final Selector selector ;final ServerSocketChannel serverSocketChannel;public Reactor(int port) throws IOException {this.selector=Selector.open();this.serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));//一定是非阻塞的,阻塞的一個通道就只能處理一個請求了serverSocketChannel.configureBlocking(false);//把OP_ACCEPT事件和Acceptor綁定SelectionKey sk=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);sk.attach(new Acceptor());}@Overridepublic void run() {while(!Thread.interrupted()){try {selector.select();//該方法是阻塞的,只有IO事件來了才向下執(zhí)行Set<SelectionKey> selected=selector.selectedKeys();Iterator<SelectionKey> it=selected.iterator();while(it.hasNext()){dispatch(it.next());}selected.clear() } catch (IOException e) {e.printStackTrace();}}}private void dispatch(SelectionKey next) {Runnable runnable= (Runnable) next.attachment();if(runnable!=null){runnable.run();}}private class Acceptor implements Runnable{@Overridepublic void run() {SocketChannel channel= null;try {channel = serverSocketChannel.accept();if (channel!=null){new Handler(selector,channel);}} catch (IOException e) {e.printStackTrace();}}} }class Handler implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static final int READING=0,SENDING=1;int state=READING;public Handler(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個有個問題,ppt上register是0,*/sk=this.channel.register(selector,SelectionKey.OP_READ);sk.attach(this);/*** 這里的作用是,設(shè)置key的狀態(tài)為可讀,然后讓后selector的selector返回。* 然后就可以處理OP_READ事件了*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {if (state==READING) read();if (state==SENDING) write();}void read(){state=SENDING;sk.interestOps(SelectionKey.OP_WRITE);}void write(){sk.cancel();} }
?上面代碼注解理解有誤:
sk=this.channel.register(selector,0); 這里是初始化一個SelectionKey 不監(jiān)聽事件sk.interestOps(SelectionKey.OP_READ); 這里設(shè)置監(jiān)聽的事件為OP_READ
多線程的Reactor模式
現(xiàn)在的CPU多核的,為了提高對硬件的使用效率需要考慮使用多線程的Reactor設(shè)計模式,Reactor主要用來處罰事件的,但是事件的處理會降低Reactor的性能,考慮把事件的處理放到別的線程上來做,有點想android的設(shè)計模式,UI線程用來接收用戶的事件,事件的處理放到線程去做來提高用戶的體驗。多線程Reactor有兩種一種是Reactor線程只關(guān)注io事件,事件處理放到別的線程,一種對事件分類主Reactor只關(guān)注Accept事件,子Reactor關(guān)注read和write事件。事件處理放到線程去做,這也是netty的設(shè)計模式。
?
class HandlerPool implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static ExecutorService executor = Executors.newFixedThreadPool(100);static final int READING=0,SENDING=1;int state=READING;public HandlerPool(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個有個問題,這里注冊的SelectionKey是不處理的,應(yīng)該他監(jiān)聽的事件為0*/sk=this.channel.register(selector,0);sk.attach(this);/*** 這里的作用是,SelectionKey的監(jiān)聽事件為OP_READ。interestOps對哪個事件感興趣*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {executor.execute(new Processer());}class Processer implements Runnable{@Overridepublic void run() {}} }這個就在Acceptor里面多實例化幾個Selector,它處理Read和Write事件。
?
大致架構(gòu)弄懂了。后面邊看netty源碼,邊學(xué)習(xí)nio
?
轉(zhuǎn)載于:https://www.cnblogs.com/gaoxing/p/4237789.html
總結(jié)
以上是生活随笔為你收集整理的NIO和Reactor的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 杀死占用端口的进程
- 下一篇: bash中通过设置PS1变量改变提示符颜