日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RPC-非阻塞通信下的同步API实现原理,以Dubbo为例

發(fā)布時間:2025/3/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RPC-非阻塞通信下的同步API实现原理,以Dubbo为例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

  Netty在Java NIO領(lǐng)域基本算是獨占鰲頭,涉及到高性能網(wǎng)絡(luò)通信,基本都會以Netty為底層通信框架,Dubbo 也不例外。以下將以Dubbo實現(xiàn)為例介紹其是如何在NIO非阻塞通信基礎(chǔ)上實現(xiàn)同步通信的。

? ? Dubbo為一種RPC通信框架,提供進程間的通信,在使用dubbo協(xié)議+Netty作為傳輸層時,提供三種API調(diào)用方式:

  • 同步接口
  • 異步帶回調(diào)接口
  • 異步不帶回調(diào)接口
  • ? ? 同步接口適用在大部分環(huán)境,通信方式簡單、可靠,客戶端發(fā)起調(diào)用,等待服務(wù)端處理,調(diào)用結(jié)果同步返回。這種方式下,在高吞吐、高性能(響應(yīng)時間很快)的服務(wù)接口場景中最為適用,可以減少異步帶來的額外的消耗,也方便客戶端做一致性保證。

    ?

    ? ? 異步帶回調(diào)接口,用在任務(wù)處理時間較長,客戶端應(yīng)用線程不愿阻塞等待,而是為了提高自身處理能力希望服務(wù)端處理完成后可以異步通知應(yīng)用線程。這種方式可以大大提升客戶端的吞吐量,避免因為服務(wù)端的耗時問題拖死客戶端。

    ? ? 異步不帶回調(diào)接口,一些場景為了進一步提升客戶端的吞吐能力,只需發(fā)起一次服務(wù)端調(diào)用,不需關(guān)系調(diào)用結(jié)果,可以使用此種通信方式。一般在不需要嚴格保證數(shù)據(jù)一致性或者有其他補償措施的情況下,選用這種,可以最小化遠程調(diào)用帶來的性能損耗。

    ????

    ? ? 來看一下Dubbo是如何實現(xiàn)這三種API的。核心代碼在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,如下圖對應(yīng)的位置,屬于協(xié)議層的實現(xiàn)部分。為方便大家可以準確定位代碼所在位置,使用截圖的方式,而不是直接貼代碼了。

    ????上文描述的是三種API方式,Dubbo里面通過參數(shù)isOneway、isAsync來控制,isOneway=true表示異步不帶回調(diào),isAsync=true表示異步帶回調(diào),否則是同步API。具體是如何控制,看以下代碼:

    ????isOneway==true時,客戶端send完請求后,直接return一個空結(jié)果的RpcResult;isAsync==true時,客戶端發(fā)起請求,設(shè)置一個ResponseFuture,直接return一個空結(jié)果的RpcResult,接下來當服務(wù)端處理完成,客戶端Netty層在收到響應(yīng)后會通過Future通知應(yīng)用線程;最后是同步情況下,客戶端發(fā)起請求,并通過get()方法阻塞等待服務(wù)端的響應(yīng)結(jié)果。

    ? ? 異步API情況下,結(jié)合NIO模型比較好理解是如何實現(xiàn)的(當然需要先了解NIO的reactor模型),接下來重點理解下,這個get()阻塞方法是如何做到基于非阻塞NIO實現(xiàn)同步阻塞效果。

    ? ? 直接進入get()方法內(nèi)部。

    ? ? 可以看到是利用Java的鎖機制實現(xiàn),循環(huán)判斷是否收到響應(yīng),如果收到或者等待超時則返回。done的實例對象如下:

    private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition();

    ? ? 使用可重入鎖ReentrantLock,獲取一個Condition對象在其上做await操作。這里有await操作,何時被喚醒呢,有兩個條件,第一個是等待timeout超時,默認dubbo是1s,第二個就是被其他線程喚醒,即收到了服務(wù)端的響應(yīng)。

    ? ? signal信號一發(fā)出,上文循環(huán)檢測內(nèi)的await操作會立即返回,下一次isDone判斷會變成true,直接跳出循環(huán)。

    ? ? 仔細看代碼會發(fā)現(xiàn),被喚醒的地方還有一個是在DefaultFuture內(nèi)部有一個超時輪詢檢測的線程,這個線程主要是處理響應(yīng)超時后觸發(fā)資源回收、記錄異常日志等操作。????

    private static class RemotingInvocationTimeoutScan implements Runnable {public void run() {while (true) {try {for (DefaultFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {// create exception response.Response timeoutResponse = new Response(future.getId());// set timeout status.timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response. DefaultFuture.received(future.getChannel(), timeoutResponse);}}Thread.sleep(30);} catch (Throwable e) {logger.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");th.setDaemon(true);th.start();}

    ?

    ? ?   可能會有疑問,這個觸發(fā)操作為何不直接在get()方法內(nèi)部檢測到超時直接調(diào)用DefaultFuture.received(Channel channel, Response response)來清理,而是要額外開啟一個后臺線程。

    ????單獨啟動一個超時線程有兩個好處:

  • ?提高超時精度
  • ????  get()方法內(nèi)部的輪詢有一個timeout,每次超時喚醒的時間間隔至少是timeout時長,最差的情況可能會等待2*timeout作出超時反應(yīng)。在超時輪詢線程中,每隔30ms遍歷檢測一次,可以很大程度的提升超時精度。

    ? ? ? ?2.? 提升性能,降低響應(yīng)時間

    ? ?   剝離超時處理邏輯到一個單獨線程,可以減少對業(yè)務(wù)線程的時間占用,這個超時后的處理對應(yīng)用來說并無直接作用,完全可以放到后臺異步去處理。另外單獨在一個線程中,實際上有批量處理的表現(xiàn)。

    ? ? 以上是就NIO通信基礎(chǔ)上實現(xiàn)三種API調(diào)用的實現(xiàn)原理,或許有更多優(yōu)于Dubbo的處理方式,可以拿出來討論。

    轉(zhuǎn)載于:https://www.cnblogs.com/yaohonv/p/rpc_sycn_nio.html

    總結(jié)

    以上是生活随笔為你收集整理的RPC-非阻塞通信下的同步API实现原理,以Dubbo为例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。