日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

selector多路复用_多路复用器Selector

發(fā)布時間:2023/12/20 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 selector多路复用_多路复用器Selector 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Unix系統(tǒng)有五種IO模型分別是阻塞IO(blocking IO),非阻塞IO( non-blocking IO),IO多路復用(IO multiplexing),信號驅(qū)動(SIGIO/Signal IO)和異步IO(Asynchronous IO)。而IO多路復用通常有select,poll,epoll,kqueue等方式。而多路復用器Selector,就是采用這些IO多路復用的機制獲取事件。JDK中的NIO(new IO)包,采用的就是IO多路復用的模型。

select,poll和epoll

阻塞IO下,應用程序調(diào)用IO函數(shù),如果沒有數(shù)據(jù)貯備好,那么IO操作會一直阻塞下去,阻塞IO不會占用大量CPU,但在這種IO模型下,一個線程只能處理一個文件描述符的IO事件;非阻塞IO下,應用程序調(diào)用IO事件,如果數(shù)據(jù)沒有準備好,會直接返回一個錯誤,應用程序不會阻塞,這樣就可以同時處理多個文件描述符的IO事件,但是需要不間斷地輪詢來獲取IO事件,對CPU是很大的浪費。并且阻塞IO和非阻塞IO調(diào)用一個IO函數(shù)只能獲取一個IO事件。

select,poll和epoll是最常見的三種IO多路復用的方式,它們都支持同時感知多個IO事件,它們的工作特點和區(qū)別如下:

select可以在一定時間內(nèi)監(jiān)視多個文件描述符的IO事件,select函數(shù)需要傳入要監(jiān)視的文件描述符的句柄作為參數(shù),并且返回所有文件描述符,應用程序需要遍歷所有的循環(huán)來看每一個文件描述符是否有IO事件發(fā)生,效率較低。并且,select默認只能監(jiān)視1024個文件描述符,這些文件描述符采用數(shù)組進行存儲,可以修改FD_SETSIZE的值來修改文件描述符的數(shù)量限制。

poll和select類似,poll采用鏈表存儲監(jiān)視的文件描述符,可以超過1024的限制。

epoll可以監(jiān)控的文件描述符數(shù)量是可以打開文件的數(shù)量上限。與select和poll不同,epoll獲取事件不是通過輪詢得到,而是通過給每個文件描述符定義回調(diào)得到,因此,在監(jiān)視的文件描述符很多的情況下,epoll的效率不會有明顯的下降。并且,select和poll返回給應用程序的是所有的文件描述符,而epoll返回的是就緒(有事件發(fā)生的)的文件描述符。

JDK NIO包中的各種Selector

JDK中的Selector是一個抽象類,創(chuàng)建一個Selector通常以下面代碼中的方式進行:

/**

* 代碼片段1 創(chuàng)建Selector

*/

Selector selector = Selector.open();

下面是具體的實現(xiàn):

/**

* 代碼片段2 Selector中的open方法和SelectorProvider中的provider方法

*/

//調(diào)用SelectorProvider的openSelector創(chuàng)建Selector

public static Selector open() throws IOException {

return SelectorProvider.provider().openSelector();

}

//創(chuàng)建SelectorProvider,最終調(diào)用sun.nio.ch.DefaultSelectorProvider.create(), 這個方法在不同平臺上有不同的實現(xiàn)

public static SelectorProvider provider() {

synchronized (lock) {

if (provider != null)

return provider;

return AccessController.doPrivileged(

new PrivilegedAction() {

public SelectorProvider run() {

if (loadProviderFromProperty())

return provider;

if (loadProviderAsService())

return provider;

provider = sun.nio.ch.DefaultSelectorProvider.create();

return provider;

}

});

}

}

在不同的操作系統(tǒng)平臺下,SelectorProvider的實現(xiàn)也不相同,創(chuàng)建出來的Selector的實現(xiàn)也不一樣。

windows下的多路復用實現(xiàn)

windows下的jdk中只有一個非抽象的SelectorProvider的實現(xiàn)類——WindowsSelectorProvider。顯然,sun.nio.ch.DefaultSelectorProvider.create()返回的也是一個WindowsSelectorProvider對象:

/**

* 代碼片段3 windows環(huán)境下jdk中的sun.nio.ch.DefaultSelectorProvider.create()方法

*/

public static SelectorProvider create() {

return new WindowsSelectorProvider();

}

WindowsSelectorProvider的openSelector方法會返回一個WindowsSelectorImpl對象,WindowsSelectorImpl繼承了SelectorImpl這個抽象類:

/**

* 代碼片段4

*/

public AbstractSelector openSelector() throws IOException {

return new WindowsSelectorImpl(this);

}

WindowsSelectorImpl的成員變量pollWrapper是一個PollArrayWrapper對象,PollArrayWrapper類在openjdk的源碼中有這樣的一段文檔注釋:

/**

* 代碼片段5 windows下的PollArrayWrapper類中的注釋

*/

/**

* Manipulates a native array of structs corresponding to (fd, events) pairs.

*

* typedef struct pollfd {

* ? ?SOCKET fd; ? ? ? ? ? ?// 4 bytes

* ? ?short events; ? ? ? ? // 2 bytes

* } pollfd_t;

*

* @author Konstantin Kladko

* @author Mike McCloskey

*/

PollArrayWrapper是用來操作(fd,events)對相對應的結(jié)構(gòu)的原生數(shù)組。這個原生數(shù)組的結(jié)構(gòu)就是上面的注釋中的結(jié)構(gòu)體所定義的。PollArrayWrapper類中通過操作AllocatedNativeObject類型的成員變量pollArray來操作文件描述符和事件。AllocatedNativeObject類繼承了NativeObject類,NativeObject類型是駐留在本地內(nèi)存中的對象的代理,提供了在堆外內(nèi)存中存放和取出除boolean外的基本類型數(shù)據(jù)的方法。以byte類型為例,其存取方法如下:

/**

* 代碼片段6 ?NativeObject中的getByte和putByte方法

*/

/**

* Reads a byte starting at the given offset from base of this native

* object.

*

* @param ?offset

* ? ? ? ? The offset at which to read the byte

*

* @return The byte value read

*/

final byte getByte(int offset) {

return unsafe.getByte(offset + address);

}

/**

* Writes a byte at the specified offset from this native object's

* base address.

*

* @param ?offset

* ? ? ? ? The offset at which to write the byte

*

* @param ?value

* ? ? ? ? The byte value to be written

*/

final void putByte(int offset, byte value) {

unsafe.putByte(offset + address, ?value);

}

PollArrayWrapper中提供了方法存儲事件和文件描述符,這些方法都通過來pollArray存取int和short類型,這些方法和PollArrayWrapper構(gòu)造方法如下:

/**

* 代碼片段7 ?PollArrayWrapper構(gòu)造方法和對文件描述符和事件的操作方法

*/

PollArrayWrapper(int newSize) {

int allocationSize = newSize * SIZE_POLLFD;

pollArray = new AllocatedNativeObject(allocationSize, true);

pollArrayAddress = pollArray.address();

this.size = newSize;

}

// Access methods for fd structures

void putDescriptor(int i, int fd) {

pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);

}

void putEventOps(int i, int event) {

pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);

}

int getEventOps(int i) {

return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);

}

int getDescriptor(int i) {

return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);

}

因為pollfd結(jié)構(gòu)體中,fd占用4個字節(jié),events占用2個字節(jié)分別對應int和short的長度。FD_OFFSET,EVENT_OFFSET和SIZE_POLLFD分別是final修飾的int常量0,4和8。PollArrayWrapper會用8個字節(jié)來存儲一個event和fd的配對,構(gòu)造一個PollArrayWrapper對象會從堆外內(nèi)存分配newSize8字節(jié)的空間。獲取第i個fd則獲取對應的第8i個字節(jié)對應的int,獲取第i個event則只要獲取第8*i+4個字節(jié)對應的short。所以構(gòu)造一個size大小的PollArrayWrapper對象就可以存儲size個fd,event對,并且他們在內(nèi)存上是連續(xù)的(每對的空間末尾有2個字節(jié)用不到),所以這也是一個數(shù)組。

Selector中的doSelect方法是具體對文件描述符的操作,WindowsSelectorImpl中的doSelector方法如下:

/**

* 代碼片段8 ?WindowsSelectorImpl內(nèi)部類SubSelector的poll方法中的doSelect方法及其調(diào)用的方法具體實現(xiàn)

*/

protected int doSelect(long timeout) throws IOException {

if (channelArray == null)

throw new ClosedSelectorException();

this.timeout = timeout; // set selector timeout

processDeregisterQueue();

if (interruptTriggered) {

resetWakeupSocket();

return 0;

}

// ?計算輪詢所需的輔助線程數(shù)。如果需要,在這里創(chuàng)建線程并開始等待startLock

adjustThreadsCount();

// 重置finishLock

finishLock.reset();

// ?喚醒輔助線程,等待啟動鎖,線程啟動后會開始輪詢。冗余線程將在喚醒后退出。

startLock.startThreads();

// 在主線程中進行輪詢。主線程負責pollArray中的前MAX_SELECTABLE_FDS(默認1024)個fd,event對。

try {

begin();

try {

subSelector.poll();

} catch (IOException e) {

// 保存異常

finishLock.setException(e);

}

// ?主線程poll()調(diào)用結(jié)束。喚醒其他線程并等待他們

if (threads.size()0)

finishLock.waitForHelperThreads();

} finally {

end();

}

finishLock.checkForException();

processDeregisterQueue();

// 更新相應channel的操作。將就緒的key添加到就緒隊列。

int updated = updateSelectedKeys();

// poll()調(diào)用完成。為下一次運行,將wakeupSocket設(shè)置為nonsigned。

resetWakeupSocket();

return updated;

}

//WindowsSelectorImpl內(nèi)部類SubSelector的poll方法

private int poll() throws IOException{ // poll for the main thread

return poll0(pollWrapper.pollArrayAddress,

Math.min(totalChannels, MAX_SELECTABLE_FDS),

readFds, writeFds, exceptFds, timeout);

}

//WindowsSelectorImpl內(nèi)部類SubSelector的poll0方法

private native int poll0(long pollAddress, int numfds,

int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

poll0方法的C語言源碼:

/**

* 代碼片段9 ?WindowsSelectorImpl內(nèi)部類SubSelector的poll方法的c源碼

*/

JNIEXPORT jint JNICALL

Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,

jlong pollAddress, jint numfds,

jintArray returnReadFds, jintArray returnWriteFds,

jintArray returnExceptFds, jlong timeout)

{

... //省略部分代碼

/* Call select */

if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))//調(diào)用系統(tǒng)的select函數(shù)

== SOCKET_ERROR) {

/* Bad error - this should not happen frequently */

/* Iterate over sockets and call select() on each separately */

FD_SET errreadfds, errwritefds, errexceptfds;

readfds.fd_count = 0;

writefds.fd_count = 0;

exceptfds.fd_count = 0;

for (i = 0; i < numfds; i++) {

/* prepare select structures for the i-th socket */

errreadfds.fd_count = 0;

errwritefds.fd_count = 0;

if (fds[i].events & POLLIN) {

errreadfds.fd_array[0] = fds[i].fd;

errreadfds.fd_count = 1;

}

if (fds[i].events & (POLLOUT | POLLCONN))

{

errwritefds.fd_array[0] = fds[i].fd;

errwritefds.fd_count = 1;

}

errexceptfds.fd_array[0] = fds[i].fd;

errexceptfds.fd_count = 1;

/* call select on the i-th socket */

if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)//調(diào)用系統(tǒng)的select函數(shù)

== SOCKET_ERROR) {

/* This socket causes an error. Add it to exceptfds set */

exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;

exceptfds.fd_count++;

} else {

/* This socket does not cause an error. Process result */

if (errreadfds.fd_count == 1) {

readfds.fd_array[readfds.fd_count] = fds[i].fd;

readfds.fd_count++;

}

if (errwritefds.fd_count == 1) {

writefds.fd_array[writefds.fd_count] = fds[i].fd;

writefds.fd_count++;

}

if (errexceptfds.fd_count == 1) {

exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;

exceptfds.fd_count++;

}

}

}

}

... //省略部分代碼

}

可見Window環(huán)境的JDK的nio是調(diào)用select系統(tǒng)函數(shù)來進行的。

linux下的多路復用實現(xiàn)

linux的jdk中有2個非抽象的Selector的子類——PollSelectorImpl和EPollSelectorImpl。

PollSelectorImpl

顧名思義,PollSelectorImpl是采用poll來進行多路復用。PollSelectorImpl繼承了AbstractPollSelectorImpl。AbstractPollSelectorImpl中也維護了一個PollArrayWrapper來存儲文件描述符和事件對,但linux下的PollArrayWrapper和windows下的實現(xiàn)并不相同。先看PollSelectorImpl的doSelect方法如下:

/**

* 代碼片段10 ?PollSelectorImpl的doSelect方法

*/

protected int doSelect(long timeout)

throws IOException

{

if (channelArray == null)

throw new ClosedSelectorException();

processDeregisterQueue();

try {

begin();

pollWrapper.poll(totalChannels, 0, timeout);

} finally {

end();

}

processDeregisterQueue();

// 將pollfd結(jié)構(gòu)中的信息復制到相應通道的ops中。將就緒的key添加到就緒隊列。

int numKeysUpdated = updateSelectedKeys();

if (pollWrapper.getReventOps(0) != 0) {

// Clear the wakeup pipe

pollWrapper.putReventOps(0, 0);

synchronized (interruptLock) {

IOUtil.drain(fd0);

interruptTriggered = false;

}

}

return numKeysUpdated;

}

doSelect方法中會調(diào)用PollArrayWrapper中的poll方法,linux下的PollArrayWrapper和windows下的不太一樣。PollArrayWrapper源碼中的文檔注釋如下:

/**

* 代碼片段11 ?linux下的PollArrayWrapper類中的注釋

*/

/**

* Manipulates a native array of pollfd structs on Solaris:

*

* typedef struct pollfd {

* ? ?int fd;

* ? ?short events;

* ? ?short revents;

* } pollfd_t;

*

* @author Mike McCloskey

* @since 1.4

*/

可以發(fā)現(xiàn),與windows的相比,linux下的PollArrayWrapper操作的結(jié)構(gòu)體多了一個revents(實際發(fā)生的事件)的字段,linux下的PollArrayWrapper類繼承了抽象類AbstractPollArrayWrapper,AbstractPollArrayWrapper定義了對文件描述符和事件的操作方法:

/**

* 代碼片段12 ?AbstractPollArrayWrapper中定義的幾個final常量和對文件描述符、事件的操作方法

*/

static final short SIZE_POLLFD ? = 8;

static final short FD_OFFSET ? ? = 0;

static final short EVENT_OFFSET ?= 4;

static final short REVENT_OFFSET = 6;

protected AllocatedNativeObject pollArray;

// Access methods for fd structures

int getEventOps(int i) {

int offset = SIZE_POLLFD * i + EVENT_OFFSET;

return pollArray.getShort(offset);

}

int getReventOps(int i) {

int offset = SIZE_POLLFD * i + REVENT_OFFSET;

return pollArray.getShort(offset);

}

int getDescriptor(int i) {

int offset = SIZE_POLLFD * i + FD_OFFSET;

return pollArray.getInt(offset);

}

void putEventOps(int i, int event) {

int offset = SIZE_POLLFD * i + EVENT_OFFSET;

pollArray.putShort(offset, (short)event);

}

void putReventOps(int i, int revent) {

int offset = SIZE_POLLFD * i + REVENT_OFFSET;

pollArray.putShort(offset, (short)revent);

}

void putDescriptor(int i, int fd) {

int offset = SIZE_POLLFD * i + FD_OFFSET;

pollArray.putInt(offset, fd);

}

可見,linux下的PollArrayWrapper中pollArray的每8個字節(jié)的后兩個字節(jié)不是空,而是存儲著兩個字節(jié)的revents。PollArrayWrapper的poll方法如下:

/**

* 代碼片段13 ?PollArrayWrapper中poll方法

*/

int poll(int numfds, int offset, long timeout) {

return poll0(pollArrayAddress + (offset * SIZE_POLLFD),

numfds, timeout);

}

private native int poll0(long pollAddress, int numfds, long timeout);

poll0方法的c語言源碼:

/**

* 代碼片段14 ?PollArrayWrapper中poll方法的c源碼

*/

JNIEXPORT jint JNICALL

Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,

jlong address, jint numfds,

jlong timeout)

{

struct pollfd *a;

int err = 0;

a = (struct pollfd *) jlong_to_ptr(address);

if (timeout <= 0) { ? ? ? ? ? /* Indefinite or no wait */

//如果timeout<=0,立即調(diào)用系統(tǒng)的poll函數(shù)

RESTARTABLE (poll(a, numfds, timeout), err);

} else { ? ? ? ? ? ? ? ? ? ? /* Bounded wait; bounded restarts */

//如果timeout>0,會循環(huán)的調(diào)用poll函數(shù)直到到了timeout的時間

err = ipoll(a, numfds, timeout);

}

if (err < 0) {

JNU_ThrowIOExceptionWithLastError(env, "Poll failed");

}

return (jint)err;

}

static int ipoll(struct pollfd fds[], unsigned int nfds, int timeout)

{

jlong start, now;

int remaining = timeout;

struct timeval t;

int diff;

gettimeofday(&t, NULL);

start = t.tv_sec * 1000 + t.tv_usec / 1000;

for (;;) {

//調(diào)用poll函數(shù) remaining是剩余的timeout,其實也就調(diào)用一次,用循環(huán)應該是為了防止poll函數(shù)的進程被異常喚醒

int res = poll(fds, nfds, remaining);

if (res < 0 && errno == EINTR) {

if (remaining >= 0) {

gettimeofday(&t, NULL);

now = t.tv_sec * 1000 + t.tv_usec / 1000;

diff = now - start;

remaining -= diff;

if (diff < 0 || remaining <= 0) {

return 0;

}

start = now;

}

} else {

return res;

}

}

}

可見PollSelectorImpl確實是調(diào)用系統(tǒng)的poll函數(shù)實現(xiàn)多路復用的。

EPollSelectorImpl

EPollSelectorImpl中使用EPollArrayWrapper來操作文件描述符和事件,EPollArrayWrapper中的對EPoll事件結(jié)構(gòu)體的文檔注釋:

/**

* 代碼片段15 ?EPollArrayWrapper類中的注釋

*/

/**

* Manipulates a native array of epoll_event structs on Linux:

*

* typedef union epoll_data {

* ? ? void *ptr;

* ? ? int fd;

* ? ? __uint32_t u32;

* ? ? __uint64_t u64;

* ?} epoll_data_t;

*

* struct epoll_event {

* ? ? __uint32_t events;

* ? ? epoll_data_t data;

* };

*

* The system call to wait for I/O events is epoll_wait(2). It populates an

* array of epoll_event structures that are passed to the call. The data

* member of the epoll_event structure contains the same data as was set

* when the file descriptor was registered to epoll via epoll_ctl(2). In

* this implementation we set data.fd to be the file descriptor that we

* register. That way, we have the file descriptor available when we

* process the events.

*/

等待IO時間的系統(tǒng)調(diào)用函數(shù)是epoll_wait(2),它填充了一個epoll_event結(jié)構(gòu)體的數(shù)組,這個數(shù)組被傳遞給系統(tǒng)調(diào)用。epoll_event結(jié)構(gòu)的數(shù)據(jù)成員包含的數(shù)據(jù)與通過epoll_ctl(2)將文件描述符注冊到epoll時設(shè)置的數(shù)據(jù)相同。在這個實現(xiàn)中,我們將data.fd設(shè)置為注冊的文件描述符。這樣,我們在處理事件時就有了可用的文件描述符。

很明顯,EPollSelectorImpl中操作的結(jié)構(gòu)體大小比PollSelectorImpl要大,這里不一一解讀了。EPoll的調(diào)用和select、poll不同,需要調(diào)用三個系統(tǒng)函數(shù),分別是epoll_create,epoll_ctl 和 epoll_wait,這點在JDK NIO中也得到驗證。在EPollArrayWrapper創(chuàng)建時會調(diào)用epollCreate方法:

/**

* 代碼片段16 ?EPollArrayWrapper的構(gòu)造方法和構(gòu)造方法中調(diào)用的epollCreate方法

*/

EPollArrayWrapper() throws IOException {

// creates the epoll file descriptor

epfd = epollCreate();

// the epoll_event array passed to epoll_wait

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;

pollArray = new AllocatedNativeObject(allocationSize, true);

pollArrayAddress = pollArray.address();

// eventHigh needed when using file descriptors > 64k

if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)

eventsHigh = new HashMap<>();

}

private native int epollCreate();

這里的epollCreate方法也就是進行epoll_create系統(tǒng)調(diào)用,創(chuàng)建一個EPoll實例。以下是C源碼:

/**

* 代碼片段17 ?epollCreate方法的c源碼

*/

JNIEXPORT jint JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)

{

/*

* epoll_create expects a size as a hint to the kernel about how to

* dimension internal structures. We can't predict the size in advance.

*/

//進行epoll_create系統(tǒng)調(diào)用

int epfd = epoll_create(256);

if (epfd < 0) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");

}

return epfd;

}

EPollSelectorImpl的構(gòu)造方法中創(chuàng)建完成一個EPollArrayWrapper實例后,會執(zhí)行該實例的initInterrupt方法,這個方法中調(diào)用了epollCtl方法:

/**

* 代碼片段18 ?EPollSelectorImpl的構(gòu)造方法、構(gòu)造方法中調(diào)用的EPollArrayWrapper中的initInterrupt方法

* 和initInterrupt中調(diào)用的epollCtl方法

*/

/**

* Package private constructor called by factory method in

* the abstract superclass Selector.

*/

EPollSelectorImpl(SelectorProvider sp) throws IOException {

super(sp);

long pipeFds = IOUtil.makePipe(false);

fd0 = (int) (pipeFds >>> 32);

fd1 = (int) pipeFds;

pollWrapper = new EPollArrayWrapper();

//調(diào)用initInterrupt方法

pollWrapper.initInterrupt方法(fd0, fd1);

fdToKey = new HashMap<>();

}

void initInterrupt(int fd0, int fd1) {

outgoingInterruptFD = fd1;

incomingInterruptFD = fd0;

//調(diào)用epollCtl

epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);

}

private native void epollCtl(int epfd, int opcode, int fd, int events);

這里的epollCtl方法也就是進行epoll_ctl系統(tǒng)調(diào)用,往剛剛創(chuàng)建的EPoll實例中添加要監(jiān)控的事件。以下是C源碼:

/**

* 代碼片段19 ?epollCtl方法的c源碼

*/

JNIEXPORT void JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,

jint opcode, jint fd, jint events)

{

struct epoll_event event;

int res;

event.events = events;

event.data.fd = fd;

//調(diào)用epoll_ctl

RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

/*

* A channel may be registered with several Selectors. When each Selector

* is polled a EPOLL_CTL_DEL op will be inserted into its pending update

* list to remove the file descriptor from epoll. The "last" Selector will

* close the file descriptor which automatically unregisters it from each

* epoll descriptor. To avoid costly synchronization between Selectors we

* allow pending updates to be processed, ignoring errors. The errors are

* harmless as the last update for the file descriptor is guaranteed to

* be EPOLL_CTL_DEL.

*/

if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");

}

}

EPollSelectorImpl的doSelect方法會調(diào)用EPollArrayWrapper的poll方法,而在poll方法中會調(diào)用epollWait:

/**

* 代碼片段20 ?EPollSelectorImpl中的doSelect方法、doSelect方法中調(diào)用的EPollArrayWrapper的poll方法

* 和poll方法中調(diào)用的epollWait方法

*/

protected int doSelect(long timeout) throws IOException {

if (closed)

throw new ClosedSelectorException();

processDeregisterQueue();

try {

begin();

pollWrapper.poll(timeout);

} finally {

end();

}

processDeregisterQueue();

int numKeysUpdated = updateSelectedKeys();

if (pollWrapper.interrupted()) {

// Clear the wakeup pipe

pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);

synchronized (interruptLock) {

pollWrapper.clearInterrupted();

IOUtil.drain(fd0);

interruptTriggered = false;

}

}

return numKeysUpdated;

}

int poll(long timeout) throws IOException {

//更新注冊信息,如果監(jiān)視的實踐發(fā)生變化,會調(diào)用epoll_ctl往Epoll實例中增加或刪除事件

updateRegistrations();

//調(diào)用epollWait

updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);

for (int i=0; i

if (getDescriptor(i) == incomingInterruptFD) {

interruptedIndex = i;

interrupted = true;

break;

}

}

return updated;

}

private native int epollWait(long pollAddress, int numfds, long timeout,

int epfd) throws IOException;

這里的epollWait方法也就是進行epoll_wait系統(tǒng)調(diào)用,調(diào)用者進程被掛起,在等待內(nèi)核I/O事件的分發(fā)。以下是C源碼:

/**

* 代碼片段21 ?epollWait方法的c源碼

*/

JNIEXPORT jint JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,

jlong address, jint numfds,

jlong timeout, jint epfd)

{

struct epoll_event *events = jlong_to_ptr(address);

int res;

if (timeout <= 0) { ? ? ? ? ? /* Indefinite or no wait */

//如果timeout<=0,立即調(diào)用系統(tǒng)的epoll_wait函數(shù)

RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);

} else { ? ? ? ? ? ? ? ? ? ? ?/* Bounded wait; bounded restarts */

//如果timeout>0,循環(huán)調(diào)用直到超時時間到了,用循環(huán)應該是為了防止異常喚醒

res = iepoll(epfd, events, numfds, timeout);

}

if (res < 0) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");

}

return res;

}

總結(jié)

至此,本文已經(jīng)對三種IO多路復用技術(shù)和在JDK中的應用進行了解讀。在windows環(huán)境下,JDK NIO中只有WindowsSelectorImpl這有一個Selector的非抽象實現(xiàn),采用的IO多路復用方式是select;在linux環(huán)境下PollSelectorImpl和EPollSelectorImpl兩種實現(xiàn),分別采用poll和epoll實現(xiàn)IO多路復用。本文還對這些Selector的具體實現(xiàn)進行了詳細的解讀,不足之處,敬請指正。

總結(jié)

以上是生活随笔為你收集整理的selector多路复用_多路复用器Selector的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。