日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop DFS源码研究之---Hadoop RPC机制

發布時間:2025/4/16 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop DFS源码研究之---Hadoop RPC机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

先記錄server端的機制

??最初接觸RPC,用自己的思路來猜測RPC的實現機制:

??Server端開啟socket監聽,listen()à accept()àread()àwrite()àclose()

??有請求來時開啟thread處理請求,原進程繼續監聽,請求完畢后將結果返回給client端


??這樣設計的缺點:

??當訪問量大時,并發開啟大量線程,會造成server端資源瓶頸。

??每個線程中,read()阻塞,直到有數據進來。?


Hadoop server端機制:使用JAVA NIO機制(new IO)?

?JAVA NIO常用的幾個類是 Listener,selector, reader, handler, responder

Hadoop中:


Listener:監聽RPC server的端口,接受連接,用selector篩選server端感興趣的連接,然后把連接轉發到某個Reader,讓Reader去讀取那個連接的數據。 Reader:Reader從某個客戶端連接中讀取數據流,把它轉化成調用對象(Call),放到調用隊列(call queue)里 Handler:從調用隊列中獲取調用信息,然后調用真正的對象,把調用結果放到響應隊列(response queue)里 Responder:它不斷地檢查響應隊列中是否有調用信息,如果有的話,就把調用的結果返回給客戶端。


首先Listener創建socketchannel 相當于傳統socket方式中的socket fd ?(個人理解。。)


? ? ??acceptChannel = ServerSocketChannel.open();
? ? ? acceptChannel.configureBlocking(false);


? ? ? // Bind the server socket to the local host and port
? ? ? bind(acceptChannel.socket(), address, backlogLength);

綁定之后創建selector

selector= Selector.open();

以及reader pool ?

? ? ? readers = new Reader[readThreads];
? ? ? readPool = Executors.newFixedThreadPool(readThreads);
? ? ? for (int i = 0; i < readThreads; i++) {
? ? ? ? Selector readSelector = Selector.open();
? ? ? ? Reader reader = new Reader(readSelector);
? ? ? ? readers[i] = reader;
? ? ? ? readPool.execute(reader);
? ? ? }
每個reader有一個readselector


然后listener 的?selector開始監聽:

? ? while (running) {
? ? ? ? SelectionKey key = null;
? ? ? ? try {
? ? ? ? ??selector.select();
? ? ? ? ? Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
? ? ? ? ? while (iter.hasNext()) {
? ? ? ? ? ? key = iter.next();
? ? ? ? ? ? iter.remove();
? ? ? ? ? ? try {
? ? ? ? ? ? ? if (key.isValid()) {
? ? ? ? ? ? ? ? if (key.isAcceptable())
? ? ? ? ? ? ? ? ??doAccept(key); ??
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? }
? ? ? ? ? ? key = null;
? ? ? ? ? }
? ? ? ? }?

accept之后

將對應的selectionkey 轉給Reader

void doAccept(SelectionKey key) throws IOException, ?OutOfMemoryError {
? ? ? Connection c = null;
? ? ??ServerSocketChannel server = (ServerSocketChannel) key.channel();//獲取這個key對應的socket句柄 或者說channel
? ? ? SocketChannel channel;
? ? ? while ((channel = server.accept()) != null) {
? ? ? ? channel.configureBlocking(false);
? ? ? ? channel.socket().setTcpNoDelay(tcpNoDelay);
? ? ? ? Reader reader = getReader();
? ? ? ? try {
? ? ? ? ? reader.startAdd();
? ? ? ? ? SelectionKey readKey = reader.registerChannel(channel);
? ? ? ? ? c = new Connection(readKey, channel, System.currentTimeMillis());
? ? ? ? ??readKey.attach(c);
? ? ? ? ? synchronized (connectionList) {
? ? ? ? ? ? connectionList.add(numConnections, c);
? ? ? ? ? ? numConnections++;
? ? ? ? ? }
? ? ? ? ? if (LOG.isDebugEnabled())
? ? ? ? ? ? LOG.debug("Server connection from " + c.toString() +
? ? ? ? ? ? ? ? "; # active connections: " + numConnections +
? ? ? ? ? ? ? ? "; # queued calls: " + callQueue.size()); ? ? ? ? ?
? ? ? ? } finally {
? ? ? ? ? reader.finishAdd();?
? ? ? ? }


? ? ? }
? ? }
將連接轉給reader


reader是一直在阻塞的

? ? public void run() {
? ? ? ? LOG.info("Starting SocketReader");
? ? ? ? synchronized (this) {
? ? ? ? ? while (running) {
? ? ? ? ? ? SelectionKey key = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ??readSelector.select();
? ? ? ? ? ? ? while (adding) {
? ? ? ? ? ? ? ? this.wait(1000);
? ? ? ? ? ? ? } ? ? ? ? ? ? ?


? ? ? ? ? ? ? Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
? ? ? ? ? ? ? while (iter.hasNext()) {
? ? ? ? ? ? ? ? key = iter.next();
? ? ? ? ? ? ? ? iter.remove();
? ? ? ? ? ? ? ? if (key.isValid()) {
? ? ? ? ? ? ? ? ? if (key.isReadable()) {
? ? ? ? ? ? ? ? ? ??doRead(key);
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? key = null;
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? if (running) { ? ? ? ? ? ? ? ? ? ? ?// unexpected -- log it
? ? ? ? ? ? ? ? LOG.info(getName() + " caught: " +
? ? ? ? ? ? ? ? ? ? ? ? ?StringUtils.stringifyException(e));
? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? LOG.error("Error in Reader", ex);
? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? }
? ? ? }


? void doRead(SelectionKey key) throws InterruptedException {
? ? ? int count = 0;
? ? ??Connection c = (Connection)key.attachment();//獲取這個連接
? ? ? if (c == null) {
? ? ? ? return; ?
? ? ? }
? ? ? c.setLastContact(System.currentTimeMillis());
? ? ??
? ? ? try {
? ? ??? count = c.readAndProcess();//在這個函數中通過channel讀內容 調用channelRead到Buffer里面讀數據
? ? ? } catch (InterruptedException ieo) {
? ? ? ? LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
? ? ? ? throw ieo;
? ? ? } catch (Exception e) {
? ? ? ? LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
? ? ? ? count = -1; //so that the (count < 0) block is executed
? ? ? }
? ? ? if (count < 0) {
? ? ? ? if (LOG.isDebugEnabled())
? ? ? ? ? LOG.debug(getName() + ": disconnecting client " +?
? ? ? ? ? ? ? ? ? ? c + ". Number of active connections: "+
? ? ? ? ? ? ? ? ? ? numConnections);
? ? ? ? closeConnection(c);
? ? ? ? c = null;
? ? ? }
? ? ? else {
? ? ? ? c.setLastContact(System.currentTimeMillis());
? ? ? }
? ? } ??


個人這樣理解:

傳統的socket過程是這樣的:在一個socket 上listen --> accept-->返回一個新的fd,這個fd轉給了別的線程,去讀、寫數據

JAVA NIO 是這樣的: 在一個socket上listen --> 用一個selector阻塞 accept -->將一個有channel信息的connection轉給reader,可以有很多reader,比如hadoop的reader pool -->每個reader有一個selector,selector負責阻塞等待感興趣的信息,等到后通過keys獲得channel的信息,然后通過channel讀內容,將內容轉換成call,給call queue ? ?后續處理。

所以client端的數據傳輸并不是通過server端listen的那個socket 而是和傳統的socket一樣,是通過listen accept后返回的那個socket。

才看一兩天 并不是特別確定這個結論

請大家指正,謝謝



======續========


其實JAVA NIO機制中,也可以在listener的selector部分 讀取客戶端的信息 也就是除了判斷信息是不是acceptable之外 也可以判斷channel是否readable、writable,但是這樣仍然會引發一個阻塞的問題,就是如果readable的信息遲遲不來,這樣仍然會阻塞listen線程 ?hadoop的優化是 把讀取信息也異步化了 ?在reader里面讀 ?有一個reader pool。


NIO的selector負責的是監聽和內容分發

關于selector:

SelectorChannel之間的關聯由一個SelectionKey實例表示。(注意:一個信道可以注冊多個Selector實例,因此可以有多個關聯的SelectionKey實例)。?SelectionKey維護了一個信道上感興趣的操作類型信息,并將這些信息存放在一個int型的位圖中,該int型數據的每一位都有相應的含義。SelectionKey類中的常量定義了信道上可能感興趣的操作類型,每個這種常量都是只有一位設置為1的位掩碼。

它在內部可以同時管理多個I/O,當一個信道有I/O操作的時 候,他會通知Selector,Selector就是記住這個信道有I/O操作,并且知道是何種I/O操作,hadoop中,acceptable的操作在listener的selector中來監聽,可讀的信道操作在reader pool中每個reader的selector來監聽。

這樣,對客戶端請求的accept、信息的read ?就分散到幾個線程中了,一個listen線程,幾個read線程,這些線程來輪詢channel。


關于channel:

一個 Channel實例代表了一個“可輪詢的”I/O目標,如套接字(或一個文件、設備等)。

Channel能夠注冊一個Selector類的實例。 Selector的select()方法允許你詢問“在一組信道中,哪一個當前需要服務(即,被接收,讀或寫)”,這兩個類都包含在 java.nio.channels包中。

一個 Selector實例可以同時檢查一組信道的I/O狀態。


SelectorChannel之間的關聯由一個SelectionKey實例表示。(注意:一個信道可以注冊多個Selector實例,因此可以有多個關聯的SelectionKey實例)。?SelectionKey維護了一個信道上感興趣的操作類型信息,并將這些信息存放在一個int型的位圖中,該int型數據的每一位都有相應的含義。SelectionKey類中的常量定義了信道上可能感興趣的操作類型,每個這種常量都是只有一位設置為1的位掩碼。


上面的內容部分摘自http://vaporz.blog.51cto.com/3142258/587229>


問題:selector和channel是不是多對多的


另外 JAVA NIO的另一個好處是 使用了Buffer 代替 stream ?使得性能可控 這個還需要再學習研究 具體沒有什么概念。


總結

以上是生活随笔為你收集整理的Hadoop DFS源码研究之---Hadoop RPC机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 911精品国产一区二区在线 | 欧洲做受高潮欧美裸体艺术 | 内裤摩擦1v1h | 日本不卡免费在线 | 色综合天天综合网天天看片 | 91在线视频播放 | 向日葵视频在线播放 | 神马午夜一区 | 自拍偷拍亚洲天堂 | 日韩一卡二卡三卡 | 日韩欧美高清视频 | 日韩精品一二区 | 日韩中文字幕免费视频 | 成人7777| 成人免费在线电影 | 97精品熟女少妇一区二区三区 | 亚洲精品视频免费 | 在线观看网站污 | 91久久久久久久久久久久 | 动漫玉足吸乳羞免费网站玉足 | 免费的毛片网站 | 日日夜夜免费精品 | 噼里啪啦免费观看 | 午夜影音| 日本乱偷人妻中文字幕在线 | av综合在线观看 | 欧美美女性生活 | 亚洲国产综合久久 | 中文字幕一区二区三区精品 | 未满十八18禁止免费无码网站 | 精品国产一区二区三区久久狼黑人 | av福利在线免费观看 | 亚洲欧美日韩动漫 | 一卡二卡三卡在线视频 | 欧美人妻一区二区 | 国产精品欧美激情在线 | 青青草在线播放 | 亚洲免费在线观看av | 北条麻妃一区二区三区四区五区 | 国产欧美一区二区三区在线 | 在线观看国产小视频 | 成年人午夜视频 | 亚洲精品综合在线 | 鲁在线视频 | 老子影院午夜精品无码 | 毛片网站在线 | 久久网站免费看 | 亚洲男女激情 | 亚洲精品激情 | 欧美日韩中字 | 少妇一级淫片免费放2 | av看片资源| 丝袜美腿中文字幕 | av色图在线 | 黄瓜视频色 | 春闺艳妇(h)高h产乳 | 精品视频在线一区二区 | 少妇精品一区二区 | 久久久久久综合 | xxxwww黄色 | 国内毛片视频 | 亚洲免费网址 | 亚洲av无码一区二区三区人 | 成人区人妻精品一区 | 精品人妻一区二区三区四区久久 | 天天激情站 | 污污污www精品国产网站 | 成人免费毛片东京热 | 天天插美女 | 97超碰人人爱 | 国产一级片av | 熟妇人妻久久中文字幕 | 日韩精品――色哟哟 | 在线观看视频你懂得 | 亚洲乱码中文字幕久久孕妇黑人 | 成年人免费视频网站 | 日韩在线播放av | 天躁夜夜躁狼狠躁 | 欧洲视频在线观看 | 在线久草 | 欧美特黄色片 | 美女操出白浆 | 欧美交换国产一区内射 | 黄色无遮挡网站 | 色女人在线 | 国产福利一区二区 | 国产精品美女久久久久久久久 | 解开人妻的裙子猛烈进入 | 少妇扒开粉嫩小泬视频 | 伊人青草 | 一本之道高清无码视频 | 男人的天堂久久 | 日韩一区二区a片免费观看 伊人网综合在线 | av无遮挡 | 夜色综合网 | 日日干av | 中文字幕乱码av | xxxxx黄色 | 欧美日韩精品久久久免费观看 |