日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Thrift源码学习二——Server层

發(fā)布時間:2025/3/8 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Thrift源码学习二——Server层 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Thrift 提供了如圖五種模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer

??

TSimpleServer、TThreadPoolServer 屬于阻塞模型

TNonblockingServer、THsHaServer、TThreadedSelectorServer 屬于非阻塞模型

TServer

TServer 為抽象類

public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);} }public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 處理層工廠 TProcessorFactory processorFactory;// 傳輸層工廠TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 協(xié)議層工廠TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory(); }

TServer 定義的對外方法

/*** The run method fires up the server and gets things going.*/public abstract void serve(); /*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}

stop 并不是每個服務(wù)都需要優(yōu)雅的退出,所以沒有定義為抽象方法

抽象方法 serve() 由具體的 TServer 實例實現(xiàn)

TSimpleServer

TSimpleServer 實現(xiàn)比較簡單,是單線程阻塞模型,只適合測試開發(fā)使用

serve 方法源碼分析

public void serve() {// 啟動監(jiān)聽 socket serverTransport.listen();// 設(shè)置服務(wù)狀態(tài)setServing(true);// 不斷的等待與處理 socket 請求while(!stopped) {// accept 一個業(yè)務(wù) socket 請求client = serverTransport_.accept();if (client != null) {// 通過工廠獲取 server 定義的處理層、傳輸層和協(xié)議層processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式處理while (true) {// 處理請求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果處理層為異步,則退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 關(guān)閉 eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);} }

TSimpleServer 工作圖

TThreadPoolServer

ThreadPoolServer 解決了 TSimple 不支持并發(fā)和多連接的問題,引入了線程池

與 TSimple 相同,主線程負(fù)責(zé)阻塞式監(jiān)聽 socket,而剩下的業(yè)務(wù)處理則全部交由線程池去處理

public void serve() {// 主線程啟動監(jiān)聽 socket serverTransport_.listen();// 設(shè)置服務(wù)狀態(tài)stopped_ = false;setServing(true);// 等待并處理 socket 請求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 邏輯與 TSimpleServer 類似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由線程池來處理 executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false); }

TThreadPoolServer 的缺點:

處理能力的好壞受限于線程池的設(shè)置

TNoblockingServer

TNoblockingServer 是單線程工作,但該模式采用了 NIO,所有的 socket 被注冊到 selector 中,通過一個線程循環(huán) selector 來監(jiān)控所有 socket,當(dāng)有就緒的 socket 時,根據(jù)不同的請求做不同的動作(讀取、寫入數(shù)據(jù)或 accept 新連接)

TNoblockingServer 的 serve 方法在其父類 AbstractNonblockingServer 中定義

/*** Begin accepting connections and processing invocations.* 開始接受并處理調(diào)用*/ public void serve() {// start any IO threads// 注冊一些監(jiān)聽 socket 的線程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 開始監(jiān)聽if (!startListening()) {return;}// 設(shè)置服務(wù)狀態(tài)setServing(true);// this will block while we serve// TNonblocking 中實現(xiàn)為 selectAcceptThread_.join(); // 主線程等待 selectAcceptThread 執(zhí)行完畢// SelectAcceptThread 的 run 方法為 select();// 取出一個就緒的 socket waitForShutdown();setServing(false);// do a little cleanup stopListening(); }// SelectAcceptThread run 方法 public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);} }// SelectAcceptThread Select 過程 private void select() {try {// wait for io events.// NIO 取出一個 selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍歷就緒的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并將其注冊到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 處理讀數(shù)據(jù)的 socket 請求 handleRead(key);} else if (key.isWritable()) {// deal with writes// 處理寫數(shù)據(jù)的 socket 請求 handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);} }// 接收新的連接 private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注冊到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer); }

TNonblockingServer 模式的缺點:

其還是采用單線程順序來完成,當(dāng)業(yè)務(wù)處理比較復(fù)雜耗時,該模式的效率將會下降

TNonblockingServer 工作圖:

THsHaServer

THsHaServer 是 TNoblockingServer 的子類,處理邏輯基本相同,不同的是,在處理讀取請求時,THsHaServer 將處理過程交由線程池來完成,主線程直接返回進行下一次循環(huán),提高了效率

THsHaServer 模式的缺點:

其主線程需要完成對所有 socket 的監(jiān)聽一級數(shù)據(jù)的寫操作,當(dāng)大請求量時,效率較低

TThreadedSelectorServer

TThreadedSelectorServer 是 Thrift 目前提供的最高級模式,生產(chǎn)環(huán)境的首選,其對 TNonblockingServer 進行了擴展

TThreadedSelectorServer 源碼中一些關(guān)鍵的屬性

public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的連接中選擇線程的個數(shù)public int selectorThreads = 2;// 執(zhí)行線程池 ExecutorService 的線程個數(shù)private int workerThreads = 5;// 執(zhí)行請求具體任務(wù)的線程池private ExecutorService executorService = null; } // The thread handling all accepts private AcceptThread acceptThread; // Threads handling events on client transports private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); // This wraps all the functionality of queueing and thread pool management // for the passing of Invocations from the selector thread(s) to the workers // (if any). private final ExecutorService invoker; /*** 循環(huán)模式的負(fù)載均衡器,用于為新的連接選擇 SelectorThread*/ protected static class SelectorThreadLoadBalancer {}
  • AcceptThread 線程對象,用于監(jiān)聽 socket 的新連接

  • 多個 SelectorThread 線程對象,用于處理 socket 的讀寫操作

  • 一個負(fù)載均衡對象 SelectorThreadLoadBalancer,用于決定將 AcceptThread 接收到的 socket 請求分配給哪個 SelectorThread 線程

  • SelectorThread 線程執(zhí)行過讀寫操作后,通過 ExecutorService 線程池來完成此次調(diào)用的具體執(zhí)行

  • SelectorThread 對象源碼解析

    /*** 多個 SelectorThread 負(fù)責(zé)處理 socket 的 I/O 操作*/ protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 選擇(處理 socket 的網(wǎng)絡(luò)讀寫 IO),分配和管理現(xiàn)有連接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads handleRead(key);} else if (key.isWritable()) {// deal with writes handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} }

    AcceptThread 對象源碼解析

    /*** 在服務(wù)器傳輸中選擇線程(監(jiān)聽 socket 請求)并向 IO 選擇器(SelectorThread)提供新連接*/ protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 負(fù)載均衡器,決定將連接分配給哪個 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 處理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 從負(fù)載均衡器中,獲取 SelectorThread 線程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 將選擇到的線程和連接放入 線程池 處理// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態(tài),則將處于阻塞狀態(tài) doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態(tài),則將處于阻塞狀態(tài)private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}} }

    TThreadedSelectorServer 工作圖

    參考資料

    • Thrift server端的幾種工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
    • Thrift 網(wǎng)絡(luò)服務(wù)模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html

    總結(jié)

    以上是生活随笔為你收集整理的Thrift源码学习二——Server层的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。