日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

HDFS2.x之RPC流程分析

發(fā)布時(shí)間:2024/4/17 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HDFS2.x之RPC流程分析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

HDFS2.x之RPC流程分析

1 概述

??? Hadoop提供了一個(gè)統(tǒng)一的RPC機(jī)制來(lái)處理client-namenode, namenode-dataname,client-dataname之間的通信。RPC是整個(gè)Hadoop中通信框架的核心,目前采用ProtocolBuf作為RPC的默認(rèn)實(shí)現(xiàn)。RPC的整體調(diào)用流程如下:

?

2 Protobuf

??? Protocol buffer(以下簡(jiǎn)稱PB),PB是Google開(kāi)源的一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)格式,可以用于結(jié)構(gòu)化數(shù)據(jù)的序列化和反序列化,很適合做數(shù)據(jù)存儲(chǔ)或 RPC 數(shù)據(jù)交換格式,目前提供了 C++、Java、Python 三種語(yǔ)言的 API。序列化/反序列化速度快,網(wǎng)絡(luò)或者磁盤IO傳輸?shù)臄?shù)據(jù)少。

RPC就是一臺(tái)機(jī)器上的某個(gè)進(jìn)程要調(diào)用另外一臺(tái)機(jī)器上的某個(gè)進(jìn)程的方法,中間通信傳輸?shù)木褪穷愃朴凇胺椒?shù)1、參數(shù)2……”這樣的信息,是結(jié)構(gòu)化的。

我們要定義一種PB實(shí)現(xiàn)的RPC傳輸格式,首先要定義相應(yīng)的.proto文件,在Hadoop common工程里,這些文件放在hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程里這些文件放在hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會(huì)調(diào)用相應(yīng)的protoc二進(jìn)制程序來(lái)編譯這些以.proto結(jié)尾的文件,生成相應(yīng)的.java文件。

?

由proto文件生成的類,均提供了讀寫二進(jìn)制數(shù)據(jù)的方法:

(1)byte[] toByteArray():序列化message并且返回一個(gè)原始字節(jié)類型的字節(jié)數(shù)組;

(2)static Person parseFrom(byte[] data): 將給定的字節(jié)數(shù)組解析為message;

(3)void writeTo(OutputStream output): 將序列化后的message寫入到輸出流;

(4)static Person parseFrom(InputStream input): 讀入并且將輸入流解析為一個(gè)message;

??? 另外,PB類中都有一些Builder子類,利用其中的build方法,可以完成對(duì)象的創(chuàng)建。PB的具體應(yīng)用會(huì)在下面的RPC的Client和Server的分析中說(shuō)明。

3 RPC Client端

以create方法為例,來(lái)說(shuō)明RPC的具體執(zhí)行流程。首先看下在Client端的執(zhí)行過(guò)程。

?

??? 由HDFS客戶端發(fā)起的create操作,在經(jīng)過(guò)一系列的前置步驟之后,會(huì)通過(guò)DFSClient類中的namenode代理來(lái)完成,其定義如下:

final ClientProtocol namenode; ……NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);this.dtService = proxyInfo.getDelegationTokenService();this.namenode = proxyInfo.getProxy();

?

這說(shuō)明此處的namenode實(shí)現(xiàn)的接口是ClientProtocol,也就是Client與NameNode之間RPC通信的協(xié)議。

HDFS2.x引入了NameNode的HA,這就使得Client端的底層代理是有多個(gè)的,分別連接Active NN和Standby NN。但是在實(shí)際運(yùn)行過(guò)程中需要對(duì)Client調(diào)用呈現(xiàn)統(tǒng)一的接口,那么就出現(xiàn)了一個(gè)上層代理來(lái)統(tǒng)一上述這兩個(gè)底層代理。所有由Clientfa來(lái)的方法調(diào)用都是先到達(dá)上層代理,通過(guò)上層代理轉(zhuǎn)發(fā)到下層代理。并且,上層代理還會(huì)根據(jù)底層代理返回的Exception來(lái)決定是否進(jìn)行Failover或者Retry等操作。

在使用HA模式時(shí),客戶端創(chuàng)建代理的總體流程是:

?

其中,

(1)RetryProxy.create方法會(huì)創(chuàng)建上層代理,用于接收客戶端的請(qǐng)求,并根據(jù)情況調(diào)用連接到當(dāng)前兩個(gè)NameNode的底層代理。

Proxy.newProxyInstance(proxyProvider.getInterface().getClassLoader(),new Class<?>[] { iface },new RetryInvocationHandler(proxyProvider, retryPolicy) );

?

生成的這個(gè)代理對(duì)象實(shí)現(xiàn)了ClientProtocol接口,Client可以通過(guò)這個(gè)代理對(duì)象調(diào)用ClientProtocol接口中相應(yīng)的方法。根據(jù)Java的動(dòng)態(tài)代理機(jī)制,用戶對(duì)這個(gè)代理對(duì)象的方法調(diào)用都轉(zhuǎn)換為對(duì)RetryInvocationHandler(proxyProvider, retryPolicy)對(duì)象中invoke()方法的調(diào)用了。RetryInvocationHandler是與FailoverProxyProvider密切相關(guān)的,因?yàn)樗枰狥ailoverProxyProvider提供底層代理的支持。

(2)當(dāng)代理對(duì)象接收到請(qǐng)求的時(shí)候,會(huì)調(diào)用invoke方法來(lái)進(jìn)行處理,這里的invoke方法是上層代理中的RetryInvocationHanlder.invoke方法。

首先要獲取一個(gè)RetryPolicy,默認(rèn)的策略是在構(gòu)造RetryInvocationHandler時(shí)的參數(shù)。在Client與NameNode之間的ClientProtocol的RetryPolicy是:

RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis,

????????????? config.failoverSleepMaxMillis)

??? 接著,會(huì)調(diào)用invokeMethod方法調(diào)用底層的代理進(jìn)行實(shí)際的處理:

Object ret = invokeMethod(method, args);

? ? ?????????? -> method.invoke(currentProxy, args);

currentProxy是現(xiàn)在正在使用的底層代理。當(dāng)NN發(fā)生主從切換的時(shí)候,這個(gè)currentProxy也會(huì)發(fā)生相應(yīng)的變化。

??? 如果在調(diào)用過(guò)程中出現(xiàn)了異常,則針對(duì)不同的異常會(huì)做出不同的處理,這里的判斷是根據(jù)生成動(dòng)態(tài)代理(上層代理)的時(shí)候給定的RetryPolicy策略,默認(rèn)的RetryPolicy是FailoverOnNetworkExceptionRetry,所以調(diào)用對(duì)應(yīng)的shouldRetry()函數(shù)。

(2.1)如果Retry的次數(shù)已經(jīng)超過(guò)最大嘗試的次數(shù)了,那么就返回一個(gè)

RetryAction.RetryDecision.FAIL的RetryAction。

(2.2) 如果拋出的異常是ConnectionException、NoRouteToHostException、UnKnownHostException、StandbyException、RemoteException中的一個(gè),說(shuō)明底層代理在RPC過(guò)程中Active NN連不上或者宕機(jī)或者已經(jīng)發(fā)生主從切換了,那么就需要返回一個(gè)RetryAction.RetryDecision.FAILOVER_AND_RETRY的RetryAction,需要執(zhí)行performFailover()操作,然后用另外一個(gè)NN的底層代理重試。

(2.3)如果拋出的異常是SocketException、 IOException或者其他非RemoteException的異常,那么就無(wú)法判斷這個(gè)RPC命令到底是不是執(zhí)行成功了。可能是本地的Socket或者IO出問(wèn)題,也可能是NN端的Socket或者IO問(wèn)題。那就進(jìn)行進(jìn)一步的判斷:如果被調(diào)用的方法是idempotent的,也就是多次執(zhí)行是沒(méi)有副作用的,那么就連接另外的一個(gè)底層代理重試;否則直接返回RetryAction.RetryDecision.FAIL。

(3)FailoverProxyProvider類的當(dāng)前實(shí)現(xiàn)類為ConfiguredFailoverProxyProvider。它負(fù)責(zé)管理那兩個(gè)activeNN和standbyNN的代理,當(dāng)上層代理接收到來(lái)自用戶的一個(gè)RPC命令之后,轉(zhuǎn)發(fā)給當(dāng)前正在使用的底層代理(由ConfiguredFailoverProxyProvider.currentProxyIndex決定,表示當(dāng)前的代理對(duì)象的序號(hào))執(zhí)行,然后看是否拋出異常。如果拋出了異常,根據(jù)異常的種類來(lái)判斷是執(zhí)行failover,還是retry,或者兩者都不做。如果需要切換NameNode代理的話,則會(huì)執(zhí)行:

currentProxyIndex = (currentProxyIndex + 1) % proxies.size();

??? 底層代理的實(shí)現(xiàn)是用的非HA模式:

current.namenode = NameNodeProxies.createNonHAProxy(conf,

??????????? current.address, xface, ugi, false).getProxy();

進(jìn)一步調(diào)用->NameNodeProxies.createNNProxyWithClientProtocol

??????????? ->RPC.getProtocolProxy

方法,并把生成的ClientNamenodeProtocolPB類型的代理對(duì)象proxy封裝成ClientNamenodeProtocolTranslatorPB類型。

??? 這里又會(huì)涉及到Java的動(dòng)態(tài)代理,是在RPC.getProtocolProxy方法生成proxy對(duì)象的時(shí)候,RPC.getProtocolProxy的實(shí)現(xiàn)代碼為:

return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);

這里的引擎就是protocolbuf,所以,所有的RPC請(qǐng)求最終都會(huì)調(diào)用ProtobufRpcEngine類中的invoke方法進(jìn)行和RPC的Server端通信以及數(shù)據(jù)的序列化和反序列化操作。

??? 把Client的請(qǐng)求封裝成call的操作返回也是在invoke中進(jìn)行的:

val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,

??????????? new RpcRequestWritable(rpcRequest), remoteId);

??? 封裝的具體實(shí)現(xiàn)是調(diào)用的Client類中的call方法:

//封裝成call

Call call = new Call(rpcKind, rpcRequest);

//建立和NameNode的連接

Connection connection = getConnection(remoteId, call);

//向NameNode發(fā)送數(shù)據(jù)

connection.sendParam(call);

RPC客戶端的執(zhí)行流程(HA模式)為:

?

4 RPC Server端

RPC的Server端的初始化方法是NameNode中被調(diào)用的:

rpcServer = createRpcServer(conf);

實(shí)際上初始化NameNodeRpcServer對(duì)象,調(diào)用其構(gòu)造函數(shù):

return new NameNodeRpcServer(conf, this);

??? 在構(gòu)造方法中,會(huì)初始化兩個(gè)RPCServer,一個(gè)是serviceRpcServer,用來(lái)處理數(shù)據(jù)節(jié)點(diǎn)發(fā)來(lái)的RPC請(qǐng)求,另一個(gè)是clientRpcServer,用于處理客戶端發(fā)來(lái)的RPC請(qǐng)求。

??? NameNodeRpcServer的構(gòu)造方法會(huì)初始化RPC的Server端所需要的handler的數(shù)目(默認(rèn)為10個(gè)),設(shè)置好處理引擎為Protocolbuf,初始化ClientNamenodeProtocolServerSideTranslatorPB類型的對(duì)象clientProtocolServerTranslator用來(lái)對(duì)傳來(lái)的數(shù)據(jù)進(jìn)行反序列化,對(duì)發(fā)送的數(shù)據(jù)進(jìn)行序列化。

??? 另外,會(huì)初始化提供不同RPC服務(wù)的對(duì)象BlockingService,針對(duì)客戶端、數(shù)據(jù)節(jié)點(diǎn)端的有:

BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);

BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);

??? 緊接著,會(huì)獲取RPC的Server對(duì)象:

this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount,
false,
conf,
             namesystem.getDelegationTokenSecretManager());

?

此對(duì)象主要負(fù)責(zé)接收網(wǎng)絡(luò)連接,讀取數(shù)據(jù),調(diào)用處理數(shù)據(jù)函數(shù),返回結(jié)果。前兩個(gè)參數(shù)表示如果RPC發(fā)送過(guò)來(lái)的是ClientNamenodeProtocolPB協(xié)議,那么負(fù)責(zé)處理這個(gè)協(xié)議的服務(wù)(com.google.protobuf.BlockingService類型的對(duì)象)就是clientNNPbService。

這個(gè)Server對(duì)象里有Listener, Handler, Responder內(nèi)部類:

(1) Listener Thread:Server端會(huì)啟一個(gè)Listener線程主要用于監(jiān)聽(tīng)Client發(fā)送過(guò)來(lái)的Request,Listene會(huì)啟動(dòng)一個(gè)Reader的線程組,并把客戶端發(fā)來(lái)的Connection對(duì)象通過(guò)NIO的SelectionKey傳遞給Reader, Listener相當(dāng)于只作了一層轉(zhuǎn)發(fā);

(2) Reader Thread Pool:主要用于讀取Listener傳過(guò)來(lái)的Connection,并調(diào)用Connection的readAndProcess方法來(lái)讀取Request,并封裝成一個(gè)Call放到Call Queue中;

(3) Hanlder Thread Pool:Server會(huì)啟動(dòng)一組線程組來(lái)處理Call Queue中Call,并把處理的Respone中放到response queue中;

(4) Responder Thread:主要處理response queue中的response,并把response發(fā)送給client,如果當(dāng)前response queue為空,則第一個(gè)新增的response會(huì)馬上發(fā)送給client端,不會(huì)通過(guò)responer thread來(lái)發(fā)送。

這個(gè)RPC.getServer()會(huì)經(jīng)過(guò)層層調(diào)用,因?yàn)楝F(xiàn)在默認(rèn)的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會(huì)調(diào)用到ProtobufRpcEngine.getServer這個(gè)函數(shù),在這生成了一個(gè)Server對(duì)象,就是用于接收client端RPC請(qǐng)求,處理,回復(fù)的Server。這個(gè)Server對(duì)象是一個(gè)純粹的網(wǎng)絡(luò)服務(wù)的Server,在RPC中起到基礎(chǔ)網(wǎng)絡(luò)IO服務(wù)的作用。

RPC的Server端創(chuàng)建的總體流程是:

?

4.1 Reader處理

Server里的Reader線程也是基于Selector的異步IO模式,每次Select選出一個(gè)SelectionKey之后,會(huì)調(diào)用SelectionKey.attachment()把這個(gè)SelectionKey所attach的Connection對(duì)象獲取(在Listener的run方法中進(jìn)行的attatch),然后執(zhí)行對(duì)應(yīng)的readAndProcess()方法,把這個(gè)SelectionKey所對(duì)應(yīng)的管道上的網(wǎng)絡(luò)IO數(shù)據(jù)讀入緩沖區(qū)。readAndProcess()方法會(huì)層層調(diào)用到Server.processData()方法,在這個(gè)方法內(nèi)部,會(huì)把剛才從網(wǎng)絡(luò)IO中讀取的數(shù)據(jù)反序列化成對(duì)象rpcRequest對(duì)象。

rpcRequest對(duì)象的類型是繼承自Writable類型的子類的對(duì)象,也就是說(shuō)可以序列化/反序列化的類。這里rpcRequest對(duì)象里包含的RPC請(qǐng)求的內(nèi)容對(duì)象是由.proto文件中Message生成的類,也就是說(shuō)PB框架自動(dòng)編譯出來(lái)的類,后面可以通過(guò)調(diào)用這個(gè)類的get方法獲取RPC中真正傳輸?shù)臄?shù)據(jù)。之后把生成的rpcRequest對(duì)象放到一個(gè)Call對(duì)象里面,再把Call對(duì)象放到隊(duì)列Server.callQueue里面。

Reader的處理流程圖如下:

?

4.2 Handler處理

Handler線程默認(rèn)有10個(gè),所以處理邏輯是多線程的。每個(gè)Handler線程會(huì)從剛才提到的callQueue中取一個(gè)Call對(duì)象,然后調(diào)用Server.call()方法執(zhí)行這個(gè)Call對(duì)象中蘊(yùn)含的RPC請(qǐng)求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后這個(gè)call()函數(shù)里面真正執(zhí)行。

call方法會(huì)首先校驗(yàn)這個(gè)請(qǐng)求發(fā)過(guò)來(lái)的數(shù)據(jù)是不是合理的。然后就是獲取實(shí)現(xiàn)這個(gè)協(xié)議的服務(wù)。實(shí)現(xiàn)協(xié)議的服務(wù)在初始化的時(shí)候已經(jīng)注冊(cè)過(guò)了,就是前面說(shuō)的那個(gè)com.google.protobuf.BlockingService類型的對(duì)象clientNNPbService

這個(gè)就是實(shí)現(xiàn)Client和NameNode之間的ClientNamenodeProtocol協(xié)議的服務(wù),通過(guò)調(diào)用這句代碼:

result = service.callBlockingMethod(methodDescriptor, null, param);

就會(huì)執(zhí)行這個(gè)RPC請(qǐng)求的邏輯。service對(duì)象會(huì)把相應(yīng)的方法調(diào)用轉(zhuǎn)移到一個(gè)繼承自BlockingInterface接口的實(shí)現(xiàn)類上。Service的真正實(shí)現(xiàn)類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個(gè)函數(shù)的參數(shù)。并且此類是ClientNamenodeProtocolProtos中的子類,是在HDFS編譯的時(shí)候根據(jù)proto文件創(chuàng)建的。由于clientProtocolServerTranslator的構(gòu)造方法中傳遞的參數(shù)是NameNodeRpcServer,因此進(jìn)一步的方法調(diào)用都在NameNodeRpcServer中實(shí)現(xiàn)的。

??? Handler處理流程如下:

?

如果元數(shù)據(jù)操作邏輯NameNodeRpcServer里面拋出IOException,那么它都會(huì)把它封裝成ServiceException,然后一路傳遞給client端。在client端,會(huì)通過(guò)ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來(lái)。

??? RPC的Server端總體處理流程如下:

?

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/Scott007/p/3273352.html

總結(jié)

以上是生活随笔為你收集整理的HDFS2.x之RPC流程分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。