tomcat源码 Connector
Connector容器主要負責解析socket請求,在tomcat中的源碼位于org.apache.catalina.connector和org.apache.coyote包路徑下;通過上兩節(jié)的分析,我們知道了Connector是Service的子容器,而Service又是Server的子容器。在server.xml文件中配置,然后在Catalina類中通過Digester完成實例化。在server.xml中默認配置了兩種Connector的實現(xiàn),分別用來處理Http請求和AJP請求。
Connector的實現(xiàn)一共有以下三種:
1、Http Connector:解析HTTP請求,又分為BIO Http Connector和NIO Http Connector,即阻塞IO Connector和非阻塞IO Connector。本文主要分析NIO Http Connector的實現(xiàn)過程。
2、AJP:基于AJP協(xié)議,用于Tomcat與HTTP服務器通信定制的協(xié)議,能提供較高的通信速度和效率。如與Apache服務器集成時,采用這個協(xié)議。
3、APR HTTP Connector:用C實現(xiàn),通過JNI調用的。主要提升對靜態(tài)資源(如HTML、圖片、CSS、JS等)的訪問性能。
具體要使用哪種Connector可以在server.xml文件中通過protocol屬性配置如下:
<Connector port="8080" protocol="HTTP/1.1"connectionTimeout="20000"redirectPort="8443" />然后看一下Connector的構造器:
public Connector(String protocol) {setProtocol(protocol);// Instantiate protocol handlerProtocolHandler p = null;try {Class<?> clazz = Class.forName(protocolHandlerClassName);p = (ProtocolHandler) clazz.getConstructor().newInstance();} catch (Exception e) {log.error(sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"), e);} finally {this.protocolHandler = p;}if (Globals.STRICT_SERVLET_COMPLIANCE) {uriCharset = StandardCharsets.ISO_8859_1;} else {uriCharset = StandardCharsets.UTF_8;} }public void setProtocol(String protocol) {boolean aprConnector = AprLifecycleListener.isAprAvailable() &&AprLifecycleListener.getUseAprConnector();if ("HTTP/1.1".equals(protocol) || protocol == null) {if (aprConnector) {setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");} else {setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");}} else if ("AJP/1.3".equals(protocol)) {if (aprConnector) {setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");} else {setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");}} else {setProtocolHandlerClassName(protocol);} }通過分析Connector構造器的源碼可以知道,每一個Connector對應了一個protocolHandler,一個protocolHandler被設計用來監(jiān)聽服務器某個端口的網(wǎng)絡請求,但并不負責處理請求(處理請求由Container組件完成)。下面就以Http11NioProtocol為例分析Http請求的解析過程。
在Connector的startInterval方法中啟動了protocolHandler,代碼如下:
Http11NioProtocol創(chuàng)建一個org.apache.tomcat.util.net.NioEndpoint實例,然后將監(jiān)聽端口并解析請求的工作全被委托給NioEndpoint實現(xiàn)。tomcat在使用Http11NioProtocol解析HTTP請求時一共設計了三種線程,分別為Acceptor,Poller和Worker。
1、Acceptor線程
Acceptor實現(xiàn)了Runnable接口,根據(jù)其命名就知道它是一個接收器,負責接收socket,其接收方法是serverSocket.accept()方式,獲得SocketChannel對象,然后封裝成tomcat自定義的org.apache.tomcat.util.net.NioChannel。雖然是Nio,但在接收socket時仍然使用傳統(tǒng)的方法,使用阻塞方式實現(xiàn)。Acceptor以線程池的方式被創(chuàng)建和管理,在NioEndpoint的startInternal()方法中完成Acceptor的啟動,源碼如下:
public void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}//設置最大連接數(shù),默認值為maxConnections = 10000,通過同步器AQS實現(xiàn)。 initializeConnectionLatch();//默認是2個,Math.min(2,Runtime.getRuntime().availableProcessors());和虛擬機處理器個數(shù)比較// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();} startAcceptorThreads();} }繼續(xù)追蹤startAcceptorThreads的源碼
protected final void startAcceptorThreads() {//啟動Acceptor線程,默認是1個int count = getAcceptorThreadCount();acceptors = new Acceptor[count];for (int i = 0; i < count; i++) {acceptors[i] = createAcceptor();String threadName = getName() + "-Acceptor-" + i;acceptors[i].setThreadName(threadName);Thread t = new Thread(acceptors[i], threadName);t.setPriority(getAcceptorThreadPriority());t.setDaemon(getDaemon());t.start();} }Acceptor線程的核心代碼在它的run方法中:
protected class Acceptor extends AbstractEndpoint.Acceptor {@Overridepublic void run() {int errorDelay = 0;// Loop until we receive a shutdown commandwhile (running) {// Loop if endpoint is pausedwhile (paused && running) {state = AcceptorState.PAUSED;try {Thread.sleep(50);} catch (InterruptedException e) {// Ignore }}if (!running) {break;}state = AcceptorState.RUNNING;try {//if we have reached max connections, wait countUpOrAwaitConnection();SocketChannel socket = null;try {// Accept the next incoming connection from the server// socket//接收socket請求socket = serverSock.accept();} catch (IOException ioe) {// We didn't get a socket countDownConnection();if (running) {// Introduce delay if necessaryerrorDelay = handleExceptionWithDelay(errorDelay);// re-throwthrow ioe;} else {break;}}// Successful accept, reset the error delayerrorDelay = 0;// Configure the socketif (running && !paused) {// setSocketOptions() will hand the socket off to// an appropriate processor if successfulif (!setSocketOptions(socket)) {closeSocket(socket);}} else {closeSocket(socket);}} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(sm.getString("endpoint.accept.fail"), t);}}state = AcceptorState.ENDED;}private void closeSocket(SocketChannel socket) {countDownConnection();try {socket.socket().close();} catch (IOException ioe) {if (log.isDebugEnabled()) {log.debug(sm.getString("endpoint.err.close"), ioe);}}try {socket.close();} catch (IOException ioe) {if (log.isDebugEnabled()) {log.debug(sm.getString("endpoint.err.close"), ioe);}}} }Acceptor完成了socket請求的接收,然后交給NioEndpoint 進行配置,繼續(xù)追蹤Endpoint的setSocketOptions方法。
protected boolean setSocketOptions(SocketChannel socket) {// Process the connectiontry {//disable blocking, APR style, we are gonna be polling it//設置為非阻塞socket.configureBlocking(false);Socket sock = socket.socket();socketProperties.setProperties(sock);NioChannel channel = nioChannels.pop();if (channel == null) {SocketBufferHandler bufhandler = new SocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if (isSSLEnabled()) {channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);} else {channel = new NioChannel(socket, bufhandler);}} else {channel.setIOChannel(socket);channel.reset();}//輪訓pollers數(shù)組元素,調用Poller的register方法,完成channel的注冊。 getPoller0().register(channel);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);try {log.error("",t);} catch (Throwable tt) {ExceptionUtils.handleThrowable(tt);}// Tell to close the socketreturn false;}return true; }分析setSocketOptions的源碼可以知道,該方法的主要功能是利用傳入的SocketChannel參數(shù)生成SecureNioChannel或者NioChannel,然后注冊到Poller線程的selector中,可以進一步了解Java nio的相關知識,對這一塊內容有更深的理解。
2、Poller線程
?Poller同樣實現(xiàn)了Runnable接口,是NioEndpoint類的內部類。在Endpoint的startInterval方法中創(chuàng)建、配置并啟動了Poller線程,見代碼清單4。Poller主要職責是不斷輪詢其selector,檢查準備就緒的socket(有數(shù)據(jù)可讀或可寫),實現(xiàn)io的多路復用。其構造其中初始化了selector。
public Poller() throws IOException {this.selector = Selector.open(); }在分析Acceptor的時候,提到了Acceptor接受到一個socket請求后,調用NioEndpoint的setSocketOptions方法(代碼清單6),該方法生成了NioChannel后調用Poller的register方法生成PoolorEvent后加入到Eventqueue,register方法的源碼如下:
public void register(final NioChannel socket) {socket.setPoller(this);NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);socket.setSocketWrapper(ka);ka.setPoller(this);ka.setReadTimeout(getSocketProperties().getSoTimeout());ka.setWriteTimeout(getSocketProperties().getSoTimeout());ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());ka.setSecure(isSSLEnabled());ka.setReadTimeout(getConnectionTimeout());ka.setWriteTimeout(getConnectionTimeout());PollerEvent r = eventCache.pop();ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.//生成PoolorEvent并加入到Eventqueueif ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);else r.reset(socket,ka,OP_REGISTER);addEvent(r); }Poller的核心代碼也在其run方法中:
public void run() {// Loop until destroy() is called// 調用了destroy()方法后終止此循環(huán)while (true) {boolean hasEvents = false;try {if (!close) {hasEvents = events();if (wakeupCounter.getAndSet(-1) > 0) {//if we are here, means we have other stuff to do//do a non blocking select//非阻塞的 selectkeyCount = selector.selectNow();} else {//阻塞selector,直到有準備就緒的socketkeyCount = selector.select(selectorTimeout);}wakeupCounter.set(0);}if (close) {//該方法遍歷了eventqueue中的所有PollerEvent,然后依次調用PollerEvent的run,將socket注冊到selector中。 events();timeout(0, false);try {selector.close();} catch (IOException ioe) {log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);}break;}} catch (Throwable x) {ExceptionUtils.handleThrowable(x);log.error("",x);continue;}//either we timed out or we woke up, process events firstif ( keyCount == 0 ) hasEvents = (hasEvents | events());Iterator<SelectionKey> iterator =keyCount > 0 ? selector.selectedKeys().iterator() : null;// Walk through the collection of ready keys and dispatch// any active event.//遍歷就緒的socket事件while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();// Attachment may be null if another thread has called// cancelledKey()if (attachment == null) {iterator.remove();} else {iterator.remove();//調用processKey方法對有數(shù)據(jù)讀寫的socket進行處理,在分析Worker線程時會分析該方法 processKey(sk, attachment);}}//while//process timeouts timeout(keyCount,hasEvents);}//while getStopLatch().countDown(); }run方法中調用了events方法:
public boolean events() {boolean result = false;PollerEvent pe = null;for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {result = true;try {//將pollerEvent中的每個socketChannel注冊到selector中 pe.run();pe.reset();if (running && !paused) {//將注冊了的pollerEvent加到endPoint.eventCache eventCache.push(pe);}} catch ( Throwable x ) {log.error("",x);}}return result; }繼續(xù)跟進PollerEvent的run方法:
public void run() {if (interestOps == OP_REGISTER) {try {//將SocketChannel注冊到selector中,注冊時間為SelectionKey.OP_READ讀事件 socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);} catch (Exception x) {log.error(sm.getString("endpoint.nio.registerFail"), x);}} else {final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());try {if (key == null) {// The key was cancelled (e.g. due to socket closure)// and removed from the selector while it was being// processed. Count down the connections at this point// since it won't have been counted down when the socket// closed. socket.socketWrapper.getEndpoint().countDownConnection();((NioSocketWrapper) socket.socketWrapper).closed = true;} else {final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();if (socketWrapper != null) {//we are registering the key to start with, reset the fairness counter.int ops = key.interestOps() | interestOps;socketWrapper.interestOps(ops);key.interestOps(ops);} else {socket.getPoller().cancelledKey(key);}}} catch (CancelledKeyException ckx) {try {socket.getPoller().cancelledKey(key);} catch (Exception ignore) {}}} }3、Worker線程
Worker線程即SocketProcessor是用來處理Socket請求的。SocketProcessor也同樣是Endpoint的內部類。在Poller的run方法中(代碼清單8)監(jiān)聽到準備就緒的socket時會調用processKey方法進行處理:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {try {if ( close ) {cancelledKey(sk);} else if ( sk.isValid() && attachment != null ) {//有讀寫事件就緒時if (sk.isReadable() || sk.isWritable() ) {if ( attachment.getSendfileData() != null ) {processSendfile(sk,attachment, false);} else {unreg(sk, attachment, sk.readyOps());boolean closeSocket = false;// Read goes before write// socket可讀時,先處理讀事件if (sk.isReadable()) {//調用processSocket方法進一步處理if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {closeSocket = true;}}//寫事件if (!closeSocket && sk.isWritable()) {//調用processSocket方法進一步處理if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {closeSocket = true;}}if (closeSocket) {cancelledKey(sk);}}}} else {//invalid key cancelledKey(sk);}} catch ( CancelledKeyException ckx ) {cancelledKey(sk);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error("",t);} }繼續(xù)跟蹤processSocket方法:
public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {try {if (socketWrapper == null) {return false;}// 嘗試循環(huán)利用之前回收的SocketProcessor對象,如果沒有可回收利用的則創(chuàng)建新的SocketProcessor對象SocketProcessorBase<S> sc = processorCache.pop();if (sc == null) {sc = createSocketProcessor(socketWrapper, event);} else {// 循環(huán)利用回收的SocketProcessor對象 sc.reset(socketWrapper, event);}Executor executor = getExecutor();if (dispatch && executor != null) {//SocketProcessor實現(xiàn)了Runneble接口,可以直接傳入execute方法進行處理 executor.execute(sc);} else {sc.run();}} catch (RejectedExecutionException ree) {getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);return false;} catch (Throwable t) {ExceptionUtils.handleThrowable(t);// This means we got an OOM or similar creating a thread, or that// the pool and its queue are fullgetLog().error(sm.getString("endpoint.process.fail"), t);return false;}return true; }//NioEndpoint中createSocketProcessor創(chuàng)建一個SocketProcessor。 protected SocketProcessorBase<NioChannel> createSocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {return new SocketProcessor(socketWrapper, event); }總結:
Http11NioProtocol是基于Java Nio實現(xiàn)的,創(chuàng)建了Acceptor、Poller和Worker線程實現(xiàn)多路io的復用。三類線程之間的關系如下圖所示:
Acceptor和Poller之間是生產(chǎn)者消費者模式的關系,Acceptor不斷向EventQueue中添加PollerEvent,Pollor輪詢檢查EventQueue中就緒的PollerEvent,然后發(fā)送給Work線程進行處理。
?
轉載于:https://www.cnblogs.com/grasp/p/10099897.html
總結
以上是生活随笔為你收集整理的tomcat源码 Connector的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ubuntu navicat删除目录破解
- 下一篇: greenplum gpfdist应用