日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

NIO 源码初探

發布時間:2024/4/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 NIO 源码初探 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

說到源碼先得從Selector 的open 方法開始看起,java.nio.channels.Selector:

public static Selector open() throws IOException {return SelectorProvider.provider().openSelector(); }

看看SelectorProvider.provider()做了什么:

public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} }

其中provider = sun.nio.ch.DefaultSelectorProvider.create();會根據操作系統來返回不同的實現類,windows 平臺就返回WindowsSelectorProvider;而if (provider != null) return provider;保證了整個server 程序中只有一個WindowsSelectorProvider 對象;

再看看WindowsSelectorProvider. openSelector():

public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this); }

new WindowsSelectorImpl(SelectorProvider)代碼:

WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper = new PollArrayWrapper(INIT_CAP);wakeupPipe = Pipe.open();wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediateSinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();(sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd = ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }

其中Pipe.open()是關鍵,這個方法的調用過程是:

public static Pipe open() throws IOException {return SelectorProvider.provider().openPipe(); }

SelectorProvider 中:

public Pipe openPipe() throws IOException {return new PipeImpl(this); }

再看看怎么new PipeImpl()的:

PipeImpl(SelectorProvider sp) {long pipeFds = IOUtil.makePipe(true);int readFd = (int) (pipeFds >>> 32);int writeFd = (int) pipeFds;FileDescriptor sourcefd = new FileDescriptor();IOUtil.setfdVal(sourcefd, readFd);source = new SourceChannelImpl(sp, sourcefd);FileDescriptor sinkfd = new FileDescriptor();IOUtil.setfdVal(sinkfd, writeFd);sink = new SinkChannelImpl(sp, sinkfd); }

其中IOUtil.makePipe(true)是個native 方法:

/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */ staticnativelong makePipe(boolean blocking);

具體實現:

JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) {int fd[2];if (pipe(fd) < 0) {JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");return 0;}if (blocking == JNI_FALSE) {if ((configureBlocking(fd[0], JNI_FALSE) < 0)|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");close(fd[0]);close(fd[1]);return 0;}}return ((jlong) fd[0] << 32) | (jlong) fd[1]; } static int configureBlocking(int fd, jboolean blocking) {int flags = fcntl(fd, F_GETFL);int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }

正如這段注釋所描述的:

/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */

High32 位存放的是通道read 端的文件描述符FD(file descriptor),low 32 bits 存放的是write 端的文件描述符。所以取到makepipe()返回值后要做移位處理。

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

這行代碼把返回的pipe 的write 端的FD 放在了pollWrapper 中(后面會發現,這么做是為了實現selector 的wakeup())ServerSocketChannel.open()的實現:

public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel(); }

SelectorProvider:

public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this); }

可見創建的ServerSocketChannelImpl 也有WindowsSelectorImpl 的引用。

public ServerSocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);this.fd = Net.serverSocket(true);this.fdVal = IOUtil.fdVal(fd);this.state = ST_INUSE; }

然后通過serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector 和channel 綁定在一起,也就是把newServerSocketChannel 時創建的FD 與selector 綁定在了一起。

到此,server 端已啟動完成了,主要創建了以下對象:

WindowsSelectorProvider:單例WindowsSelectorImpl 中包含:

pollWrapper:保存selector 上注冊的FD,包括pipe 的write 端FD 和ServerSocketChannel 所用的FD

wakeupPipe:通道(其實就是兩個FD,一個read,一個write)

再到Server 中的run():

selector.select();主要調用了WindowsSelectorImpl 中的這個方法:

protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeoutprocessDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// Calculate number of helper threads needed for poll. If necessary// threads are created here and start waiting on startLockadjustThreadsCount();finishLock.reset(); // reset finishLock// Wakeup helper threads, waiting on startLock, so they start polling.// Redundant threads will exit here after wakeup.startLock.startThreads();// do polling in the main thread. Main thread is responsible for// first MAX_SELECTABLE_FDS entries in pollArray.try {begin();try {subSelector.poll();} catch (IOException e) {finishLock.setException(e); // Save this exception}// Main thread is out of poll(). Wakeup others and wait for themif (threads.size() > 0)finishLock.waitForHelperThreads();} finally {end();}// Done with poll(). Set wakeupSocket to nonsignaled for the next run.finishLock.checkForException();processDeregisterQueue();int updated = updateSelectedKeys();// Done with poll(). Set wakeupSocket to nonsignaled for the next run.resetWakeupSocket();return updated; }

其中subSelector.poll()是核心,也就是輪訓pollWrapper 中保存的FD;具體實現是調用native 方法poll0:

private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發生read 的FD private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生write 的FD private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生except 的FD

這個poll0()會監聽pollWrapper 中的FD 有沒有數據進出,這會造成IO 阻塞,直到有數據讀寫事件發生。比如,由于pollWrapper 中保存的也有ServerSocketChannel 的FD,所以只要ClientSocket 發一份數據到ServerSocket,那么poll0()就會返回;又由于pollWrapper 中保存的也有pipe 的write 端的FD,所以只要pipe 的write 端向FD 發一份數據,也會造成poll0()返回;如果這兩種情況都沒有發生,那么poll0()就一直阻塞,也就是selector.select()會一直阻塞;如果有任何一種情況發生,那么selector.select()就會返回,所有在OperationServer 的run()里要用while (true) {,這樣就可以保證在selector 接收到數據并處理完后繼續監聽poll();

這時再來看看WindowsSelectorImpl. Wakeup():

public Selector wakeup() {synchronized (interruptLock) {if (!interruptTriggered) {setWakeupSocket();interruptTriggered = true;}}return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() {setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) {/* Write one byte into the pipe */const char byte = 1;send(scoutFd, &byte, 1, 0); }

可見wakeup()是通過pipe 的write 端send(scoutFd, &byte, 1, 0),發生一個字節1,來喚醒poll()。所以在需要的時候就可以調用selector.wakeup()來喚醒selector。

?

總結

以上是生活随笔為你收集整理的NIO 源码初探的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。