日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop RPC框架

發(fā)布時間:2025/3/11 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop RPC框架 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文:http://blog.csdn.net/thomas0yang/article/details/41211259

----------------------------------------------------------------------------------------------

1、RPC框架概述 1.1 RPC(Remote Procedure Call Protocol)——遠程過程調(diào)用協(xié)議,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務,而不需要了解底層網(wǎng)絡技術的協(xié)議。RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發(fā)包括網(wǎng)絡分布式多程序在內(nèi)的應用程序更加容易。 1.2 RPC通常采用客戶端服務器模型,其框架主要有以下幾部分
  • 通信模塊:實現(xiàn)請求應該協(xié)議。主要分為同步方式和異步方式。
  • stub程序:客戶端和服務器均包含stub程序,可以看做代理程序。使得遠程函數(shù)表現(xiàn)的跟本地調(diào)用一樣,對用戶程序完全透明。
  • 調(diào)度程序:接受來自通信模塊的請求消息,根據(jù)標識選擇stub程序處理。并發(fā)量大一般采用線程池處理。
  • 客戶程序/服務過程:請求發(fā)出者和請求的處理者。
1.3 RPC流程圖

2、Hadoop RPC基本框架 2.1Hadoop RPC的使用方法見代碼
服務
public interface MyBiz extends VersionedProtocol {
??? long PROTOCOL_VERSION = 12321443L;
??? String hello(String name);
}
public class MyBizImpl implements MyBiz {
??? @Override
??? public long getProtocolVersion(String arg0, long arg1) throws IOException {
??????? return PROTOCOL_VERSION;
??? }

??? @Override
??? public String hello(String name) {
??????? System. out.println( "invoked");
??????? return "hello " + name;
??? }
}

服務器
public class MyServer {
??? public static final String SERVER_ADDRESS = "localhost";
??? public static final int SERVER_PORT = 12345;

??? public static void main(String[] args) throws IOException {
??????? Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration());
??????? server.start();
??? }
}

客戶端
public class MyClient {
??? public static void main(String[] args) throws IOException {
??????? MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION,
??????????????? new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT),
??????????????? new Configuration());
??????? String result = proxy.hello( "5");
??????? System. out.println(result);
??????? RPC.stopProxy(proxy);
??? }
}

2.2 org.apache.hadoop.ipc.RPC類解析 RPC類主要包含三部分:
  • ClientCache(成員變量):根據(jù)用戶提供的SocketFactory來緩存Client對象,以便重用Client對象。
  • Server(內(nèi)部類):繼承Server抽象類,利用反射實現(xiàn)了call方法,即客戶端請求的方法和對應參數(shù)完成方法調(diào)用。
  • Invocation(內(nèi)部類):將要調(diào)用的方法名和參數(shù)打包成可序列化的對象,方便客戶端和服務器之間傳遞。

2.3 客戶端和服務器端的關系
  • Client-NameNode之間,其中NameNode是服務器
  • Client-DataNode之間,其中DataNode是服務器
  • DataNode-NameNode之間,其中NameNode是服務器
  • DataNode-DateNode之間,其中某一個DateNode是服務器,另一個是客戶端
2.4?org.apache.hadoop.ipc.Client類解析 2.4.1 Client類中主要包含:
  • Call(內(nèi)部類):封裝了一個RPC請求,包含5個成員變量,唯一表示id、函數(shù)調(diào)用信息param、函數(shù)返回值value、函數(shù)異常信息error、函數(shù)完成標識done。Hadoop rpc?server采用異步方式處理客戶端請求,使得遠程過程調(diào)用的發(fā)生順序和返回順序無直接關系,而客戶端正是通過id識別不同的函數(shù)調(diào)用。當客戶端向服務器發(fā)送請求,只需填充id和param兩個變量,其余3個變量由服務器端根據(jù)函數(shù)執(zhí)行情況填充。
  • Connection(內(nèi)部類,一個線程):是client和server之間的一個通信連接,封裝了連接先關的基本信息和操作?;拘畔?#xff1a;通信連接唯一標識remoteId(ConnectionId)、與Server端通信的scoket、網(wǎng)絡輸入輸出流in/out、保存RPC請求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall將一個Call對象添加到哈希表中;sendParam想服務器端發(fā)送RPC請求;receiveResponse從服務器端接收已經(jīng)處理完成的RPC請求;run調(diào)用receiveResponse方法,等待返回結(jié)果。
  • ConnectionId(內(nèi)部類):連接的標記(包括server地址,協(xié)議,其他一些連接的配置項信息)
  • ParallelCall(內(nèi)部類):實現(xiàn)并行調(diào)用的請求
  • ParallelResults(內(nèi)部類):并行調(diào)用的執(zhí)行結(jié)果
2.4.2 Client類中主要對外通過兩個接口,分別用于單個遠程調(diào)用和批量遠程調(diào)用。 public Writable call(Writable param, ConnectionId remoteId)? throws InterruptedException, IOException public Writable call(Writable param, InetSocketAddress addr,? Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf)? throws InterruptedException, IOException
2.4.3 調(diào)用流程分析,當調(diào)用call函數(shù)執(zhí)行某個遠程方法時,有以下幾個步驟: 1)創(chuàng)建一個Connection對象,并將遠程方法調(diào)用信息封裝成Call對象,放到Connection對象中的哈希表中;
2)調(diào)用Connection類中的sendRpcRequest()方法將當前Call對象發(fā)送給Server端;
3)Server端處理完RPC請求后,將結(jié)果通過網(wǎng)絡返回給Client端,Client端通過receiveRpcResponse()函數(shù)獲取結(jié)果;
4)Client檢查結(jié)果處理狀態(tài)(成功還是失敗),并將對應Call對象從哈希表中刪除。
2.4.4 一個Client包含多個連接,private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
2.5?org.apache.hadoop.ipc.Server類解析
2.5.1 背景 Hadoop采用了Master/Slave結(jié)構(gòu),其中Master是整個系統(tǒng)的單點,如NameNode或JobTracker,這是制約系統(tǒng)性能和可擴展性的最關鍵因素之一;而Master通過ipc.Server接收并處理所有Slave發(fā)送的請求,這就要求ipc.Server 將高并發(fā)和可擴展性作為設計目標。為此,ipc.Server采用了很多提高并發(fā)處理能力的技術,主要包括線程池、事件驅(qū)動和Reactor設計模式等,這些技術均采用了JDK自帶的庫實現(xiàn),這里重點分析它是如何利用Reactor設計模式提高整體性能的。
2.5.2 reactor設計模式 Reactor是并發(fā)編程中的一種基于事件驅(qū)動的設計模式,它具有以下兩個特點:通過派發(fā)/分離I/O操作事件提高系統(tǒng)的并發(fā)性能;提供了粗粒度的并發(fā)控制,使用單線程實現(xiàn),避免了復雜的同步處理。典型的Reactor實現(xiàn)原理如圖所示。
典型的Reactor模式中主要包括以下幾個角色。
  • Reactor:I/O事件的派發(fā)者。
  • Acceptor:接受來自Client的連接,建立與Client對應的Handler,并向Reactor注冊此Handler。
  • Handler:與一個Client通信的實體,并按一定的過程實現(xiàn)業(yè)務的處理。Handler內(nèi)部往往會有更進一步的層次劃分,用來抽象諸如read、decode、compute、encode和send等過程。在Reactor模式中,業(yè)務邏輯被分散的I/O事件所打破,所以Handler需要有適當?shù)臋C制在所需的信息還不全(讀到一半)的時候保存上下文,并在下一次I/O事件到來的時候(另一半可讀)能繼續(xù)上次中斷的處理。
  • Reader/Sender:為了加速處理速度,Reactor模式往往構(gòu)建一個存放數(shù)據(jù)處理線程的線程池,這樣數(shù)據(jù)讀出后,立即扔到線程池中等待后續(xù)處理即可。為此,Reactor模式一般分離Handler中的讀和寫兩個過程,分別注冊成單獨的讀事件和寫事件,并由對應的Reader和Sender線程處理。
2.5.3 java nio代碼實例
package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO服務端
* @author 小路
*/
public class NIOServer {
??? //通道管理器
??? private Selector selector;

??? /**
???? * 獲得一個ServerSocket通道,并對該通道做一些初始化的工作
???? * @param port? 綁定的端口號
???? * @throws IOException
???? */
??? public void initServer(int port) throws IOException {
??????? // 獲得一個ServerSocket通道
??????? ServerSocketChannel serverChannel = ServerSocketChannel.open();
??????? // 設置通道為非阻塞
??????? serverChannel.configureBlocking(false);
??????? // 將該通道對應的ServerSocket綁定到port端口
??????? serverChannel.socket().bind(new InetSocketAddress(port));
??????? // 獲得一個通道管理器
??????? this.selector = Selector.open();
??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后,
??????? //當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
??????? serverChannel.register(selector, SelectionKey.OP_ACCEPT);
??? }

??? /**
???? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,則進行處理
???? * @throws IOException
???? */
??? @SuppressWarnings("unchecked")
??? public void listen() throws IOException {
??????? System.out.println("服務端啟動成功!");
??????? // 輪詢訪問selector
??????? while (true) {
??????????? //當注冊的事件到達時,方法返回;否則,該方法會一直阻塞
????????????selector.select();
??????????? // 獲得selector中選中的項的迭代器,選中的項為注冊的事件
??????????? Iterator ite = this.selector.selectedKeys().iterator();
??????????? while (ite.hasNext()) {
??????????????? SelectionKey key = (SelectionKey) ite.next();
??????????????? // 刪除已選的key,以防重復處理
??????????????? ite.remove();
??????????????? // 客戶端請求連接事件
??????????????? if (key.isAcceptable()) {
??????????????????? ServerSocketChannel server = (ServerSocketChannel) key
??????????????????????????? .channel();
??????????????????? // 獲得和客戶端連接的通道
??????????????????? SocketChannel channel = server.accept();
??????????????????? // 設置成非阻塞
??????????????????? channel.configureBlocking(false);

??????????????????? //在這里可以給客戶端發(fā)送信息哦
??????????????????? channel.write(ByteBuffer.wrap(new String("向客戶端發(fā)送了一條信息").getBytes()));
??????????????????? //在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權限。
????????????????????channel.register(this.selector, SelectionKey.OP_READ);

??????????????????? // 獲得了可讀的事件
??????????????? } else if (key.isReadable()) {
??????????????????? read(key);
??????????????? }

??????????? }

??????? }
??? }
??? /**
???? * 處理讀取客戶端發(fā)來的信息 的事件
???? * @param key
???? * @throws IOException
???? */
??? public void read(SelectionKey key) throws IOException{
??????? // 服務器可讀取消息:得到事件發(fā)生的Socket通道
??????? SocketChannel channel = (SocketChannel) key.channel();
??????? // 創(chuàng)建讀取的緩沖區(qū)
??????? ByteBuffer buffer = ByteBuffer.allocate(10);
??????? channel.read(buffer);
??????? byte[] data = buffer.array();
??????? String msg = new String(data).trim();
??????? System.out.println("服務端收到信息:"+msg);
??????? ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
??????? channel.write(outBuffer);// 將消息回送給客戶端
??? }

??? /**
???? * 啟動服務端測試
???? * @throws IOException
???? */
??? public static void main(String[] args) throws IOException {
??????? NIOServer server = new NIOServer();
??????? server.initServer(8000);
??????? server.listen();
??? }
}



package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO客戶端
* @author 小路
*/
public class NIOClient {
??? //通道管理器
??? private Selector selector;

??? /**
???? * 獲得一個Socket通道,并對該通道做一些初始化的工作
???? * @param ip 連接的服務器的ip
???? * @param port? 連接的服務器的端口號
???? * @throws IOException
???? */
??? public void initClient(String ip,int port) throws IOException {
??????? // 獲得一個Socket通道
??????? SocketChannel channel = SocketChannel.open();
??????? // 設置通道為非阻塞
??????? channel.configureBlocking(false);
??????? // 獲得一個通道管理器
??????? this.selector = Selector.open();

??????? // 客戶端連接服務器,其實方法執(zhí)行并沒有實現(xiàn)連接,需要在listen()方法中調(diào)
??????? //用channel.finishConnect();才能完成連接
??????? channel.connect(new InetSocketAddress(ip,port));
??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。
??????? channel.register(selector, SelectionKey.OP_CONNECT);
??? }

??? /**
???? * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,則進行處理
???? * @throws IOException
???? */
??? @SuppressWarnings("unchecked")
??? public void listen() throws IOException {
??????? // 輪詢訪問selector
??????? while (true) {
??????????? selector.select();
??????????? // 獲得selector中選中的項的迭代器
??????????? Iterator ite = this.selector.selectedKeys().iterator();
??????????? while (ite.hasNext()) {
??????????????? SelectionKey key = (SelectionKey) ite.next();
??????????????? // 刪除已選的key,以防重復處理
??????????????? ite.remove();
??????????????? // 連接事件發(fā)生
??????????????? if (key.isConnectable()) {
??????????????????? SocketChannel channel = (SocketChannel) key
??????????????????????????? .channel();
??????????????????? // 如果正在連接,則完成連接
??????????????????? if(channel.isConnectionPending()){
??????????????????????? channel.finishConnect();

??????????????????? }
??????????????????? // 設置成非阻塞
??????????????????? channel.configureBlocking(false);

??????????????????? //在這里可以給服務端發(fā)送信息哦
??????????????????? channel.write(ByteBuffer.wrap(new String("向服務端發(fā)送了一條信息").getBytes()));
??????????????????? //在和服務端連接成功之后,為了可以接收到服務端的信息,需要給通道設置讀的權限。
??????????????????? channel.register(this.selector, SelectionKey.OP_READ);

??????????????????? // 獲得了可讀的事件
??????????????? } else if (key.isReadable()) {
??????????????????? read(key);
??????????????? }
??????????? }
??????? }
??? }
??? /**
???? * 處理讀取服務端發(fā)來的信息 的事件
???? * @param key
???? * @throws IOException
???? */
??? public void read(SelectionKey key) throws IOException{
??????? //和服務端的read方法一樣
??? }


??? /**
???? * 啟動客戶端測試
???? * @throws IOException
???? */
??? public static void main(String[] args) throws IOException {
??????? NIOClient client = new NIOClient();
??????? client.initClient("localhost",8000);
??????? client.listen();
??? }

}


2.5.4 server處理流程 ipc.Server的主要功能是接收來自客戶端的RPC請求,經(jīng)過調(diào)用相應的函數(shù)獲取結(jié)果后,返回給對應的客戶端。為此,ipc.Server被劃分成3個階段:接收請求、處理請求和返回結(jié)果。 (1)接收請求
? ? ?該階段主要任務是接收來自各個客戶端的RPC請求,并將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行后續(xù)處理。該階段內(nèi)部又分為建立連接和接收請求兩個子階段,分別由Listener和Reader兩種線程完成。
? ? ?整個Server只有一個Listener線程,統(tǒng)一負責監(jiān)聽來自客戶端的連接請求,一旦有新的請求到達,它會采用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同時存在多個,它們分別負責接收一部分客戶端連接的RPC請求,至于每個Reader線程負責哪些客戶端連接,完全由Listener決定,當前Listener只是采用了簡單的輪詢分配機制。
? ? ?Listener和Reader線程內(nèi)部各自包含一個Selector對象,分別用于監(jiān)聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對于Listener線程,主循環(huán)的實現(xiàn)體是監(jiān)聽是否有新的連接請求到達,并采用輪詢策略選擇一個Reader線程處理新連接;對于Reader線程,主循環(huán)的實現(xiàn)體是監(jiān)聽(它負責的那部分)客戶端連接中是否有新的RPC請求到達,并將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。

(2)處理請求
? ? ?該階段主要任務是從共享隊列callQueue中獲取Call對象,執(zhí)行對應的函數(shù)調(diào)用,并將結(jié)果返回給客戶端,這全部由Handler線程完成。
? ? ?Server端可同時存在多個Handler線程,它們并行從共享隊列中讀取Call對象,經(jīng)執(zhí)行對應的函數(shù)調(diào)用后,將嘗試著直接將結(jié)果返回給對應的客戶端。但考慮到某些函數(shù)調(diào)用返回結(jié)果很大或者網(wǎng)絡速度過慢,可能難以將結(jié)果一次性發(fā)送到客戶端,此時Handler將嘗試著將后續(xù)發(fā)送任務交給Responder線程。

(3)返回結(jié)果
? ? ?前面提到,每個Handler線程執(zhí)行完函數(shù)調(diào)用后,會嘗試著將執(zhí)行結(jié)果返回給客戶端,但對于特殊情況,比如函數(shù)調(diào)用返回結(jié)果過大或者網(wǎng)絡異常情況(網(wǎng)速過慢),會將發(fā)送任務交給Responder線程。
? ? ?Server端僅存在一個Responder線程,它的內(nèi)部包含一個Selector對象,用于監(jiān)聽SelectionKey.OP_WRITE事件。當Handler沒能將結(jié)果一次性發(fā)送到客戶端時,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進而由Responder線程采用異步方式繼續(xù)發(fā)送未發(fā)送完成的結(jié)果。






總結(jié)

以上是生活随笔為你收集整理的Hadoop RPC框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 成都4电影免费高清 | 亚洲小视频在线播放 | 奇米影视第四色7777 | 国产做爰视频免费播放 | 男人的天堂黄色 | 中国女人内精69xxxxxx | 国产精品无码一区二区桃花视频 | 日日精 | 欧美老女人性视频 | 色妞色| 国产精品亚洲五月天丁香 | 欧美美女性生活视频 | 国产伦精品一区二区三区视频免费 | 国产精品久久久久久久久久免费 | 人人亚洲| 亚洲天堂av在线播放 | 欧美无极品 | 可以免费看av的网址 | 国产一级片中文字幕 | 国产午夜精品久久久久久久 | 国产激情视频一区 | 欧美bdsm调教视频 | 91丝袜呻吟高潮美腿白嫩在线观看 | 久久久久无码国产精品 | 饥渴少妇伦色诱公 | 蜜桃久久久久久久 | 韩国裸体网站 | 91视频.com| 国产一区二区三区麻豆 | 免费网站在线观看视频 | 五月婷婷网 | 国产视色 | 欧美视频一二三区 | 春日野结衣av | 中文字字幕在线中文 | 无码人妻丰满熟妇区96 | 男性裸体全身精光gay | 尤物视频免费在线观看 | 秘密基地免费观看完整版中文 | 五月综合激情日本mⅴ | 欧美黑粗大| 亚洲精品水蜜桃 | 免费av一级片 | 色哟哟国产精品 | 少妇一级淫片免费 | 国产情侣激情自拍 | 亚洲 国产 欧美 日韩 | 欧美乱三级 | av中文网站 | 卡一卡二av | 天天干夜夜骑 | 天天综合久久 | 美日韩丰满少妇在线观看 | 中文字幕av播放 | 人人入人人 | 色婷婷一区二区三区 | 中文字幕第7页 | 重口另类| 久草视频在 | 印度午夜性春猛xxx交 | 青青91| 野花社区视频在线观看 | 日韩精品中文字幕一区 | 黄色三级a| 91精品网站 | 免费看女人裸体 | 性欢交69精品久久久 | 亚洲人人人 | 传媒视频在线观看 | 香蕉网站视频 | 女人裸体免费网站 | 人妻精油按摩bd高清中文字幕 | xzjzjzjzjzj欧美大片 | 欧美成人精品三级网站 | 久久精品亚洲一区 | 成年免费在线观看 | 亚洲 欧美 激情 小说 另类 | 国产黄a三级三级三级看三级男男 | 黄色污污网站在线观看 | www天堂在线 | 午夜精品在线视频 | 亚洲aa视频 | 老头糟蹋新婚少妇系列小说 | 淫人网 | 国产在线观看99 | 亚州av在线播放 | 国产高清不卡视频 | 日韩av大全 | 毛利兰被扒开腿做同人漫画 | 超碰人人射 | 午夜羞羞影院 | 欧美黑人猛交 | 天堂在线免费视频 | 色偷偷亚洲| 婷婷影视 | free性欧美hd精品4k | 国产精品乱码一区二区 | 成 人 免费 黄 色 | 久久香蕉av |