Thrift源码学习二——Server层
Thrift 提供了如圖五種模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer
??
TSimpleServer、TThreadPoolServer 屬于阻塞模型
TNonblockingServer、THsHaServer、TThreadedSelectorServer 屬于非阻塞模型
TServer
TServer 為抽象類
public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);} }public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 處理層工廠 TProcessorFactory processorFactory;// 傳輸層工廠TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 協(xié)議層工廠TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory(); }TServer 定義的對外方法
/*** The run method fires up the server and gets things going.*/public abstract void serve(); /*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}stop 并不是每個服務(wù)都需要優(yōu)雅的退出,所以沒有定義為抽象方法
抽象方法 serve() 由具體的 TServer 實例實現(xiàn)
TSimpleServer
TSimpleServer 實現(xiàn)比較簡單,是單線程阻塞模型,只適合測試開發(fā)使用
serve 方法源碼分析
public void serve() {// 啟動監(jiān)聽 socket serverTransport.listen();// 設(shè)置服務(wù)狀態(tài)setServing(true);// 不斷的等待與處理 socket 請求while(!stopped) {// accept 一個業(yè)務(wù) socket 請求client = serverTransport_.accept();if (client != null) {// 通過工廠獲取 server 定義的處理層、傳輸層和協(xié)議層processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式處理while (true) {// 處理請求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果處理層為異步,則退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 關(guān)閉 eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);} }TSimpleServer 工作圖
TThreadPoolServer
ThreadPoolServer 解決了 TSimple 不支持并發(fā)和多連接的問題,引入了線程池
與 TSimple 相同,主線程負(fù)責(zé)阻塞式監(jiān)聽 socket,而剩下的業(yè)務(wù)處理則全部交由線程池去處理
public void serve() {// 主線程啟動監(jiān)聽 socket serverTransport_.listen();// 設(shè)置服務(wù)狀態(tài)stopped_ = false;setServing(true);// 等待并處理 socket 請求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 邏輯與 TSimpleServer 類似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由線程池來處理 executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false); }TThreadPoolServer 的缺點:
處理能力的好壞受限于線程池的設(shè)置
TNoblockingServer
TNoblockingServer 是單線程工作,但該模式采用了 NIO,所有的 socket 被注冊到 selector 中,通過一個線程循環(huán) selector 來監(jiān)控所有 socket,當(dāng)有就緒的 socket 時,根據(jù)不同的請求做不同的動作(讀取、寫入數(shù)據(jù)或 accept 新連接)
TNoblockingServer 的 serve 方法在其父類 AbstractNonblockingServer 中定義
/*** Begin accepting connections and processing invocations.* 開始接受并處理調(diào)用*/ public void serve() {// start any IO threads// 注冊一些監(jiān)聽 socket 的線程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 開始監(jiān)聽if (!startListening()) {return;}// 設(shè)置服務(wù)狀態(tài)setServing(true);// this will block while we serve// TNonblocking 中實現(xiàn)為 selectAcceptThread_.join(); // 主線程等待 selectAcceptThread 執(zhí)行完畢// SelectAcceptThread 的 run 方法為 select();// 取出一個就緒的 socket waitForShutdown();setServing(false);// do a little cleanup stopListening(); }// SelectAcceptThread run 方法 public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);} }// SelectAcceptThread Select 過程 private void select() {try {// wait for io events.// NIO 取出一個 selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍歷就緒的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并將其注冊到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 處理讀數(shù)據(jù)的 socket 請求 handleRead(key);} else if (key.isWritable()) {// deal with writes// 處理寫數(shù)據(jù)的 socket 請求 handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);} }// 接收新的連接 private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注冊到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer); }TNonblockingServer 模式的缺點:
其還是采用單線程順序來完成,當(dāng)業(yè)務(wù)處理比較復(fù)雜耗時,該模式的效率將會下降
TNonblockingServer 工作圖:
THsHaServer
THsHaServer 是 TNoblockingServer 的子類,處理邏輯基本相同,不同的是,在處理讀取請求時,THsHaServer 將處理過程交由線程池來完成,主線程直接返回進行下一次循環(huán),提高了效率
THsHaServer 模式的缺點:
其主線程需要完成對所有 socket 的監(jiān)聽一級數(shù)據(jù)的寫操作,當(dāng)大請求量時,效率較低
TThreadedSelectorServer
TThreadedSelectorServer 是 Thrift 目前提供的最高級模式,生產(chǎn)環(huán)境的首選,其對 TNonblockingServer 進行了擴展
TThreadedSelectorServer 源碼中一些關(guān)鍵的屬性
public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的連接中選擇線程的個數(shù)public int selectorThreads = 2;// 執(zhí)行線程池 ExecutorService 的線程個數(shù)private int workerThreads = 5;// 執(zhí)行請求具體任務(wù)的線程池private ExecutorService executorService = null; } // The thread handling all accepts private AcceptThread acceptThread; // Threads handling events on client transports private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); // This wraps all the functionality of queueing and thread pool management // for the passing of Invocations from the selector thread(s) to the workers // (if any). private final ExecutorService invoker; /*** 循環(huán)模式的負(fù)載均衡器,用于為新的連接選擇 SelectorThread*/ protected static class SelectorThreadLoadBalancer {}AcceptThread 線程對象,用于監(jiān)聽 socket 的新連接
多個 SelectorThread 線程對象,用于處理 socket 的讀寫操作
一個負(fù)載均衡對象 SelectorThreadLoadBalancer,用于決定將 AcceptThread 接收到的 socket 請求分配給哪個 SelectorThread 線程
SelectorThread 線程執(zhí)行過讀寫操作后,通過 ExecutorService 線程池來完成此次調(diào)用的具體執(zhí)行
SelectorThread 對象源碼解析
/*** 多個 SelectorThread 負(fù)責(zé)處理 socket 的 I/O 操作*/ protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 選擇(處理 socket 的網(wǎng)絡(luò)讀寫 IO),分配和管理現(xiàn)有連接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads handleRead(key);} else if (key.isWritable()) {// deal with writes handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} }AcceptThread 對象源碼解析
/*** 在服務(wù)器傳輸中選擇線程(監(jiān)聽 socket 請求)并向 IO 選擇器(SelectorThread)提供新連接*/ protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 負(fù)載均衡器,決定將連接分配給哪個 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 處理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 從負(fù)載均衡器中,獲取 SelectorThread 線程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 將選擇到的線程和連接放入 線程池 處理// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態(tài),則將處于阻塞狀態(tài) doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態(tài),則將處于阻塞狀態(tài)private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}} }TThreadedSelectorServer 工作圖
參考資料
- Thrift server端的幾種工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
- Thrift 網(wǎng)絡(luò)服務(wù)模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html
總結(jié)
以上是生活随笔為你收集整理的Thrift源码学习二——Server层的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《程序设计与数据结构》第八周学习总结
- 下一篇: 一些adb的常用命令