HDFS dfsclient读文件过程 源码分析
HDFS讀取文件的重要概念
HDFS一個文件由多個block構成。HDFS在進行block讀寫的時候是以packet(默認每個packet為64K)為單位進行的。每一個packet由若干個chunk(默認512Byte)組成。Chunk是進行數據校驗的基本單位,對每一個chunk生成一個校驗和(默認4Byte)并將校驗和進行存儲。在讀取一個block的時候,數據傳輸的基本單位是packet,每個packet由若干個chunk組成。
?
HDFS客戶端讀文件示例代碼
FileSystem hdfs = FileSystem.get(new Configuration()); Path path = new Path("/testfile");// reading FSDataInputStream dis = hdfs.open(path); byte[] writeBuf = new byte[1024]; int len = dis.read(writeBuf); System.out.println(new String(writeBuf, 0, len, "UTF-8")); dis.close();hdfs.close();?
文件的打開
HDFS打開一個文件,需要在客戶端調用DistributedFileSystem.open(Path f, int bufferSize),其實現為:
public FSDataInputStream open(Path f, int bufferSize) throws IOException {return new DFSClient.DFSDataInputStream(dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); }其中dfs為DistributedFileSystem的成員變量DFSClient,其open函數被調用,其中創建一個DFSInputStream(src, buffersize, verifyChecksum)并返回。
DFSClient.DFSDataInputStream實現了HDFS的FSDataInputStream,里面簡單包裝了DFSInputStream,實際實現是DFSInputStream完成的。
在DFSInputStream的構造函數中,openInfo函數被調用,其主要從namenode中得到要打開的文件所對應的blocks的信息,實現如下:
synchronized void openInfo() throws IOException {LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);this.locatedBlocks = newInfo;this.currentNode = null; }private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,String src, long start, long length) throws IOException {return namenode.getBlockLocations(src, start, length); }
LocatedBlocks主要包含一個鏈表的List<LocatedBlock> blocks,其中每個LocatedBlock包含如下信息:
- Block b:此block的信息
- long offset:此block在文件中的偏移量
- DatanodeInfo[] locs:此block位于哪些DataNode上
上面namenode.getBlockLocations是一個RPC調用,最終調用NameNode類的getBlockLocations函數。
NameNode返回的是根據客戶端請求的文件名字,文件偏移量,數據長度,返回文件對應的數據塊列表,數據塊所在的DataNode節點。
?
文件的順序讀取
?hdfs文件的順序讀取是最經常使用的.
文件順序讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(byte[] buffer, int offset, int length)函數進行文件讀操作。
FSDataInputStream會調用其封裝的DFSInputStream的read(byte[] buffer, int offset, int length)函數,實現如下:
public synchronized int read(byte buf[], int off, int len) throws IOException {...if (pos < getFileLength()) {int retries = 2;while (retries > 0) {try {if (pos > blockEnd) {//首次pos=0,blockEnd=-1,必定調用方法blockSeekTo,初始化blockEnd,以后是讀完了當前塊,需要讀下一個塊,才會調用blockSeekTocurrentNode = blockSeekTo(pos);//根據pos選擇塊和數據節點,選擇算法是遍歷塊所在的所有數據節點,選擇第一個非死亡節點 }int realLen = Math.min(len, (int) (blockEnd - pos + 1));int result = readBuffer(buf, off, realLen);if (result >= 0) {pos += result;} else {throw new IOException("Unexpected EOS from the reader");}...return result;} catch (ChecksumException ce) {throw ce; } catch (IOException e) {...if (currentNode != null) { addToDeadNodes(currentNode); }//遇到無法讀的DataNode,添加到死亡節點if (--retries == 0) {//嘗試讀三次都失敗,就拋出異常throw e;}}}}return -1; }?blockSeekTo函數會更新blockEnd,并創建對應的BlockReader,這里的BlockReader的初始化和上面的fetchBlockByteRange差不多,如果客戶端和塊所屬的DataNode是同個節點,則初始化一個通過本地讀取的BlockReader,否則創建一個通過Socket連接DataNode的BlockReader。
BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。
readBuffer方法比較簡單,直接調用BlockReader的read方法直接讀取數據。
BlockReader的read方法就根據請求的塊起始偏移量,長度,通過socket連接DataNode,獲取塊內容,BlockReader的read方法不會做緩存優化。
?
文件的隨機讀取
對于MapReduce,在提交作業時,已經確定了每個map和reduce要讀取的文件,文件的偏移量,讀取的長度,所以MapReduce使用的大部分是文件的隨機讀取。
文件隨機讀取的時候,客戶端利用文件打開的時候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函數進行文件讀操作。
FSDataInputStream會調用其封裝的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函數,實現如下:
public int read(long position, byte[] buffer, int offset, int length)throws IOException { long filelen = getFileLength(); int realLen = length;if ((position + length) > filelen) {realLen = (int)(filelen - position);}//首先得到包含從offset到offset + length內容的block列表//比如對于64M一個block的文件系統來說,欲讀取從100M開始,長度為128M的數據,則block列表包括第2,3,4塊blockList<LocatedBlock> blockRange = getBlockRange(position, realLen);int remaining = realLen;//對每一個block,從中讀取內容//對于上面的例子,對于第2塊block,讀取從36M開始,讀取長度28M,對于第3塊,讀取整一塊64M,對于第4塊,讀取從0開始,長度為36M,共128M數據for (LocatedBlock blk : blockRange) {long targetStart = position - blk.getStartOffset();long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset);remaining -= bytesToRead;position += bytesToRead;offset += bytesToRead;}...return realLen; }getBlockRange方法根據文件的偏移量和長度,獲取對應的數據塊信息。主要是根據NameNode類的getBlockLocations方法實現,并做了緩存和二分查找等優化。
?fetchBlockByteRange方法真正從數據塊讀取內容,實現如下:
private void fetchBlockByteRange(LocatedBlock block, long start,long end, byte[] buf, int offset) throws IOException {Socket dn = null;int numAttempts = block.getLocations().length;//此while循環為讀取失敗后的重試次數while (dn == null && numAttempts-- > 0 ) {//選擇一個DataNode來讀取數據DNAddrPair retval = chooseDataNode(block);DatanodeInfo chosenNode = retval.info;InetSocketAddress targetAddr = retval.addr;BlockReader reader = null;int len = (int) (end - start + 1);try {if (shouldTryShortCircuitRead(targetAddr)) {//如果要讀取的塊所屬的DataNode與客戶端是同一個節點,直接通過本地磁盤訪問,減少網絡流量reader = getLocalBlockReader(conf, src, block.getBlock(),accessToken, chosenNode, DFSClient.this.socketTimeout, start);} else {//創建Socket連接到DataNodedn = socketFactory.createSocket();dn.connect(targetAddr, socketTimeout);dn.setSoTimeout(socketTimeout);//利用建立的Socket鏈接,生成一個reader負責從DataNode讀取數據reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), accessToken,block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName);} //讀取數據int nread = reader.readAll(buf, offset, len);return;} finally {IOUtils.closeStream(reader);IOUtils.closeSocket(dn);dn = null;}//如果讀取失敗,則將此DataNode標記為失敗節點 addToDeadNodes(chosenNode);} }讀取塊內容,會嘗試該數據塊所在的所有DataNode,如果失敗,就把對應的DataNode加入到失敗節點,下次選擇節點就會忽略失敗節點(只在獨立的客戶端緩存失敗節點,不上報到namenode)。
BlockReader的創建也是通過BlockReader.newBlockReader創建的,具體分析請看后面。
最后,通過BlockReader的readAll方法讀取塊的完整內容。
?
dfsclient和datanode的通信協議
dfsclient的連接
dfsclient首次連接datanode時,通信協議實現主要是BlockReader.newBlockReader方法的實現,如下:
public static BlockReader newBlockReader( Socket sock, String file,long blockId,long genStamp,long startOffset, long len,int bufferSize, boolean verifyChecksum,String clientName) throws IOException {//使用Socket建立寫入流,向DataNode發送讀指令DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );out.write( DataTransferProtocol.OP_READ_BLOCK );out.writeLong( blockId );out.writeLong( genStamp );out.writeLong( startOffset );out.writeLong( len );Text.writeString(out, clientName);out.flush();//使用Socket建立讀入流,用于從DataNode讀取數據DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock),bufferSize));short status = in.readShort();//塊讀取的狀態標記,一般是成功DataChecksum checksum = DataChecksum.newDataChecksum( in );long firstChunkOffset = in.readLong();//生成一個reader,主要包含讀入流,用于讀取數據return new BlockReader( file, blockId, in, checksum, verifyChecksum,startOffset, firstChunkOffset, sock ); }這里的startOffset是相對于塊的起始偏移量,len是要讀取的長度。
DataChecksum.newDataChecksum(in),會從DataNode獲取該塊的checksum加密方式,加密長度。
BlockReader的readAll函數就是用上面生成的DataInputStream讀取數據。
?下面是是讀數據塊時,客戶端發送的信息:
| version | operator | blockid | generationStamp | startOffset | length | clientName | ?accessToken |
?
?
operator:byte Client所需要的操作,讀取一個block、寫入一個block等等
version:short Client所需要的數據與Datanode所提供數據的版本是否一致
blockId:long 所要讀取block的blockId
generationStamp:long 所需要讀取block的generationStamp
startOffset:long 讀取block的的起始位置
length:long 讀取block的長度
clientName:String Client的名字
accessToken:Token Client提供的驗證信息,用戶名密碼等
?
DataNode對dfsclient的響應
DataNode負責與客戶端代碼的通信協議交互的邏輯,主要是DataXceiver的readBlock方法實現的:
private void readBlock(DataInputStream in) throws IOException {//讀取指令long blockId = in.readLong(); Block block = new Block( blockId, 0 , in.readLong());long startOffset = in.readLong();long length = in.readLong();String clientName = Text.readString(in);//創建一個寫入流,用于向客戶端寫數據OutputStream baseStream = NetUtils.getOutputStream(s,datanode.socketWriteTimeout);DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));//生成BlockSender用于讀取本地的block的數據,并發送給客戶端//BlockSender有一個成員變量InputStream blockIn用于讀取本地block的數據BlockSender blockSender = new BlockSender(block, startOffset, length,true, true, false, datanode, clientTraceFmt);out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 發送操作成功的狀態//向客戶端寫入數據long read = blockSender.sendBlock(out, baseStream, null);……} finally {IOUtils.closeStream(out);IOUtils.closeStream(blockSender);} }?
DataXceiver的sendBlock用于發送數據,數據發送包括應答頭和后續的數據包。應答頭如下(包含DataXceiver中發送的成功標識):
?
DataXceiver的sendBlock的實現如下:
long sendBlock(DataOutputStream out, OutputStream baseStream, BlockTransferThrottler throttler) throws IOException {...try {try {checksum.writeHeader(out);//寫入checksum的加密類型和加密長度if ( chunkOffsetOK ) {out.writeLong( offset );}out.flush();} catch (IOException e) { //socket errorthrow ioeToSocketException(e);}...ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);while (endOffset > offset) {//循環寫入數據包long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);offset += len;totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize);seqno++;}try {out.writeInt(0); //標記結束 out.flush();} catch (IOException e) { //socket errorthrow ioeToSocketException(e);}}...return totalRead; }DataXceiver的sendChunks盡可能在一個packet發送多個chunk,chunk的個數由maxChunks和剩余的塊內容決定,實現如下:
//默認是crc校驗,bytesPerChecksum默認是512,checksumSize默認是4,表示數據塊每512個字節,做一次checksum校驗,checksum的結果是4個字節 private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throws IOException {int len = Math.min((int) (endOffset - offset),bytesPerChecksum * maxChunks);//len是要發送的數據長度if (len == 0) {return 0;}int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;//這次要發送的chunk數量int packetLen = len + numChunks*checksumSize + 4;//packetLen是整個包的長度,包括包頭,校驗碼,數據 pkt.clear();// write packet headerpkt.putInt(packetLen);//整個packet的長度pkt.putLong(offset);//塊的偏移量pkt.putLong(seqno);//序列號pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));//是否最后一個packetpkt.putInt(len);//發送的數據長度int checksumOff = pkt.position();int checksumLen = numChunks * checksumSize;byte[] buf = pkt.array();if (checksumSize > 0 && checksumIn != null) {try {checksumIn.readFully(buf, checksumOff, checksumLen);//填充chucksum的內容} catch (IOException e) {...}}int dataOff = checksumOff + checksumLen;if (blockInPosition < 0) {IOUtils.readFully(blockIn, buf, dataOff, len);//填充塊數據的內容if (verifyChecksum) {//默認是false,不驗證//校驗處理 }}try {//通過socket發送數據到客戶端 } catch (IOException e) {throw ioeToSocketException(e);}...return len; }數據組織成數據包來發送,數據包結構如下:
| packetLen | offset | sequenceNum | isLastPacket | startOffset | dataLen | checksum | ??data |
?
?
packetLen:int packet的長度,包括數據、數據的校驗等等
offset:long packet在block中的偏移量
sequenceNum:long 該packet在這次block讀取時的序號
isLastPacket:byte packet是否是最后一個
dataLen:int 該packet所包含block數據的長度,純數據不包括校驗和其他
checksum:該packet每一個chunk的校驗和,有多少個chunk就有多少個校驗和
data:該packet所包含的block數據
數據傳輸結束的標志,是一個packetLen長度為0的包。客戶端可以返回一個兩字節的應答OP_STATUS_CHECKSUM_OK(5)
?
dfsclient讀取塊內容
?hdfs文件的隨機和順序分析邏輯,都分析到BlockReader的readAll方法和read方法,這兩個方法完成對數據塊的內容讀取。
而readAll方法最后也是調用read方法,所以這里重點分析BlockReader的read方法,實現如下:
public synchronized int read(byte[] buf, int off, int len) throws IOException {//第一次read, 忽略前面的額外數據if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {int toSkip = (int)(startOffset - firstChunkOffset);if ( skipBuf == null ) {skipBuf = new byte[bytesPerChecksum];}if ( super.read(skipBuf, 0, toSkip) != toSkip ) {//忽略// should never happenthrow new IOException("Could not skip required number of bytes");}}boolean eosBefore = gotEOS;int nRead = super.read(buf, off, len);// if gotEOS was set in the previous read and checksum is enabled :if (dnSock != null && gotEOS && !eosBefore && nRead >= 0&& needChecksum()) {//checksum is verified and there are no errors. checksumOk(dnSock);}return nRead; }?
super.read即是FSInputChecker的read方法,實現如下
public synchronized int read(byte[] b, int off, int len) throws IOException {//參數檢查int n = 0;for (;;) {int nread = read1(b, off + n, len - n);if (nread <= 0) return (n == 0) ? nread : n;n += nread;if (n >= len)return n;} } //read1的len被忽略,只返回一個chunk的數據長度(最后一個chunk可能不足一個完整chunk的長度) private int read1(byte b[], int off, int len)throws IOException {int avail = count-pos;if( avail <= 0 ) {if(len>=buf.length) {//直接讀取一個數據chunk到用戶buffer,避免多余一次復制//很巧妙,buf初始化的大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容大于一個chunk的大小時調用
int nread = readChecksumChunk(b, off, len);return nread;} else {//讀取一個數據chunk到本地buffer,也是調用readChecksumChunk方法
//很巧妙,buf初始化大小是chunk的大小,默認是512,這里的代碼會在塊的剩余內容不足一個chunk的大小時進入調用
fill();if( count <= 0 ) {return -1;} else {avail = count;}}}//從本地buffer拷貝數據到用戶buffer,避免最后一個chunk導致數組越界int cnt = (avail < len) ? avail : len;System.arraycopy(buf, pos, b, off, cnt);pos += cnt;return cnt; }
?
FSInputChecker的readChecksumChunk會讀取一個數據塊的chunk,并做校驗,實現如下:
//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度) private int readChecksumChunk(byte b[], int off, int len)throws IOException {// invalidate buffercount = pos = 0;int read = 0;boolean retry = true;int retriesLeft = numOfRetries; //本案例中,numOfRetries是1,也就是說不會多次嘗試do {retriesLeft--;try {read = readChunk(chunkPos, b, off, len, checksum);if( read > 0 ) {if( needChecksum() ) {//這里會做checksum校驗 sum.update(b, off, read);verifySum(chunkPos);}chunkPos += read;} retry = false;} catch (ChecksumException ce) {...if (retriesLeft == 0) {//本案例中,numOfRetries是1,也就是說不會多次嘗試,失敗了,直接拋出異常throw ce;}//如果讀取的chunk校驗失敗,以當前的chunkpos為起始偏移量,嘗試新的副本if (seekToNewSource(chunkPos)) {seek(chunkPos);} else {//找不到新的副本,拋出異常throw ce;}}} while (retry);return read; }?
readChunk方法由BlockReader實現,分析如下:
//只返回一個chunk的數據長度(默認512,最后一個chunk可能不足一個完整chunk的長度) protected synchronized int readChunk(long pos, byte[] buf, int offset,int len, byte[] checksumBuf) throws IOException {//讀取一個 DATA_CHUNK.long chunkOffset = lastChunkOffset;if ( lastChunkLen > 0 ) {chunkOffset += lastChunkLen;}//如果先前的packet已經讀取完畢,就讀下一個packet。if (dataLeft <= 0) {//讀包的頭部int packetLen = in.readInt();long offsetInBlock = in.readLong();long seqno = in.readLong();boolean lastPacketInBlock = in.readBoolean();int dataLen = in.readInt();//校驗長度 lastSeqNo = seqno;isLastPacket = lastPacketInBlock;dataLeft = dataLen;adjustChecksumBytes(dataLen);if (dataLen > 0) {IOUtils.readFully(in, checksumBytes.array(), 0,checksumBytes.limit());//讀取當前包的所有數據塊內容對應的checksum,后面的流程會講checksum和讀取的chunk內容做校驗 }}int chunkLen = Math.min(dataLeft, bytesPerChecksum); //確定此次讀取的chunk長度,正常情況下是一個bytesPerChecksum(512字節),當文件最后不足一個bytesPerChecksum,讀取剩余的內容。if ( chunkLen > 0 ) {IOUtils.readFully(in, buf, offset, chunkLen);//讀取一個數據塊的chunkchecksumBytes.get(checksumBuf, 0, checksumSize);}dataLeft -= chunkLen;lastChunkOffset = chunkOffset;lastChunkLen = chunkLen;...if ( chunkLen == 0 ) {return -1;}return chunkLen; }?
總結
?本文前面概要介紹了dfsclient讀取文件的示例代碼,順序讀取文件和隨機讀取文件的概要流程,最后還基于dfsclient和datanode讀取塊的過程,做了一個詳細的分析。
?參考?http://caibinbupt.iteye.com/blog/284979
? ? ? ??http://www.cnblogs.com/forfuture1978/archive/2010/11/10/1874222.html
總結
以上是生活随笔為你收集整理的HDFS dfsclient读文件过程 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。