Hadoop HDFS (3) JAVA訪问HDFS之二 文件分布式读写策略
生活随笔
收集整理的這篇文章主要介紹了
Hadoop HDFS (3) JAVA訪问HDFS之二 文件分布式读写策略
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
先把上節未完畢的部分補全,再剖析一下HDFS讀寫文件的內部原理
在讀取過程中,假設DFSInputStream在與Datanode通信時發生了錯誤,它會試著向下一個近期的datanode節點獲取當前block數據,DFSInputStream也會記錄下錯誤發生的datanode節點,以便在以后block數據的讀取時,不再去這些節點上嘗試。 DFSInputStream在讀取到datanode上的block數據后也會做checksum校驗,假設checksum失敗,它會先向namenode報告這臺datanode上的數據有問題,然后再去嘗試一下個存有當前block的datanode。 在這一整套的設計上,最重要的一點是:client在namenode的指引下,直接向最優datanode讀取數據,這種設計讓HDFS支持大規模的并發,由于client讀取數據的流量分布在集群的每一個節點上,namenode僅僅是通過內存提供位置信息而不提供數據,假設client都通過namenode獲得數據,那client的數量就大大受限制了。
最后,Hadoop是無法知道你的網絡拓撲結構的,所以你得通過配置告訴它。默認情況下,它覺得全部的節點都是同一個機架上的節點,也就是隨意兩臺之間的距離都是同樣的。在小規模的集群中,這個默認配置就夠用了,可是大的集群須要很多其它的配置,以后講到集群配置時再說。
這一數據一致性模型對于應用程序是有影響的。應用程序的開發人員應該心里清楚,當寫操作進行時假設讀數據,或者當client或系統出現故障時,可能會有最多一個block的數據丟失。假設你的應用不能接受,那就要彩取適當的策略在適當的時候調用hflush()方法,可是頻繁調用hflush會影響吞吐量,所以你要在程序健壯性和吞吐量雙方面做出權衡,選擇適當的調用hflush的頻率。
列舉文件
FileSystem(org.apache.hadoop.fs.FileSystem)的listStatus()方法能夠列出一個文件夾下的內容。 public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException; public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException; public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException; public FileStatus[] listStatus(Path[] files, PathFilter filter) throws FileNotFoundException, IOException; 這一組方法,都接收Path參數,假設Path是一個文件,返回值是一個數組,數組里僅僅有一個元素,是這個Path代表的文件的FileStatus對象;假設Path是一個文件夾,返回值數組是該文件夾下的全部文件和文件夾的FileStatus組成的數組,有可能是一個0長數組;假設參數是Path[],則返回值相當于多次調用單Path然后把返回值整合到一個數組里;假設參數中包括PathFilter,則PathFilter會對返回的文件或文件夾進行過濾,返回滿足條件的文件或文件夾,條件由開發人員自己定義,使用方法與java.io.FileFilter相似。 以下這個程序接收一組paths,然后列出當中的FileStatus import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; public class ListStatus {public static void main(String[] args) throws Exception {String uri = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path[] paths = new Path[args.length];for (int i = 0; i < paths.length; i++) {paths[i] = new Path(args[i]);}FileStatus[] status = fs.listStatus(paths);Path[] listedPaths = FileUtil.stat2Paths(status);for (Path p : listedPaths) {System.out.println(p);}} } 上傳程序,然后運行: $hadoop?ListStatus?/ /user /user/norris 則列出/下,/user/下,/user/norris/下的全部文件和文件夾。 在Hadoop下運行程序的方法見上一篇博客(http://blog.csdn.net/norriszhang/article/details/39648857)File patterns 用通配符列出文件和文件夾
FileSystem的globStatus方法就是利用通配符來列出文件和文件夾的。glob就是通配的意思。 FileSystem支持的通配符有: *:匹配0個或多個字符 ?:匹配1個字符 [ab]:匹配方括號里列出的字符 [^ab]:匹配方括號里沒有列出的字符 [a-b]:匹配方括號里列出的字符范圍 [^a-b]:匹配方括號里列出的字符范圍以外的字符 {a,b}:或者匹配a或者匹配b \c:轉義,假設c是一個元字符,就代表這個字符本身,比方\[,就表示字符[ public FileStatus[] globStatus(Path pathPattern) throws IOException; public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException; 盡管pathPattern非常強大,可是也有些情況不能滿足,比方就是要排除某個特定文件,這時就須要使用PathFilter了。 package org.apache.hadoop.fs; public interface PathFilter {boolean accept(Path path); } 這是PathFilter這個接口的定義,使用時僅僅要實現accept方法,返回是否選中該Path即可了。這個accept方法接收的參數是一個Path,也就是說,實用的信息差點兒僅僅能拿到路徑和文件名稱,像改動時間啊、權限啊、全部者呀、大小啊什么的,都拿不到,沒有FileStatus那樣強大,所以,假設我們希望按改動時間來選文件時,就要在文件的命名時帶上時間戳了。(當然,通過FileSystem也能再次獲得那些信息,可是。。。那也太不值當的了吧?我也不確定會有多大消耗)刪除文件
FileSystem的delete方法刪除一個文件或文件夾(永久刪除)。 public boolean delete(Path f, boolean recursive) throws IOException; 刪除f,假設f是一個文件或空文件夾,則無論recursive傳什么,都刪除,假設f是一個非空文件夾,則recursive為true時文件夾下內容所有刪除,假設recursive為false,不刪除,并拋出IOException。以下再深入剖析HDFS讀寫文件時的數據流向過程
讀文件剖析
第一步:client通過調用FileSystem.open()方法打開一個文件,對于HDFS來講,事實上是調用的DistributedFileSystem實例的open方法; 第二步:DistributedFileSystem通過遠程方法調用(RPC)訪問namenode,獲取該文件的前幾個blocks所在的位置信息;針對每一個block,namenode都會返回有該block數據信息的全部datanodes節點,比方配置的dfs.replication為3,就會每一個block返回3個datanodes節點信息,這些節點是按距離client的遠近排序的,假設發起讀文件的client就在包括該block的datanode上,那該datanode就排第一位(這樣的情況在做Map任務時常見),client就會從本機讀取數據。關于怎樣推斷離client的距離遠近的問題,一會兒的網絡拓撲理論上會講到。 DistributedFileSystem的open方法返回一個FSDataInputStream,FSDataInputStream里包裝著一個DFSInputStream,DFSInputStream真正管理datanodes和namenode的I/O; 第三步:client調用FSDataInputStream.read()方法,FSDataInputStream里已經緩存了該文件前幾個block所在的datanode的地址,于是從第一個block的第一個地址(也就是近期的datanode)開始連接讀取; 第四步:重復調用read()方法,數據不斷地從datanode流向client; 第五步:當一個block的數據讀完了,DFSInputStream會關閉當前datanode的連接,打開下一個block所在的最優datanode的連接繼續讀取;這些對client是透明的,在client看來,就是在讀一個連續的流; 第六步:這樣一個block一個block讀下去,當須要很多其它block的存儲信息時,DFSInputStream會再次調用namenode,獲取下一批block的存儲位置信息,直到client停止讀取,調用FSDataInputStream.close()方法,整個讀取過程結束。在讀取過程中,假設DFSInputStream在與Datanode通信時發生了錯誤,它會試著向下一個近期的datanode節點獲取當前block數據,DFSInputStream也會記錄下錯誤發生的datanode節點,以便在以后block數據的讀取時,不再去這些節點上嘗試。 DFSInputStream在讀取到datanode上的block數據后也會做checksum校驗,假設checksum失敗,它會先向namenode報告這臺datanode上的數據有問題,然后再去嘗試一下個存有當前block的datanode。 在這一整套的設計上,最重要的一點是:client在namenode的指引下,直接向最優datanode讀取數據,這種設計讓HDFS支持大規模的并發,由于client讀取數據的流量分布在集群的每一個節點上,namenode僅僅是通過內存提供位置信息而不提供數據,假設client都通過namenode獲得數據,那client的數量就大大受限制了。
Hadoop怎樣決定哪個datanode離client近期
在網絡上什么叫“近”?在大數據流動時,帶寬是最稀缺的資源,因此,用兩個節點之前的帶寬來定義它們之間的距離非常合理。 在實踐中,那么多節點,在每兩個節點之間都測量帶寬是不現實的,Hadoop採取了折中的方式,它把網絡結構想象成一棵樹,兩個結點之間的距離就是兩個節點分別向上找父、祖父、祖宗。。。直到兩個節點有一個共同祖宗時,它們倆走的步數之和。沒有人規定樹必須有多少級,但通常的做法是分成“數據中心”、“機架”、“節點”三級,越排在前面的,之間通信帶寬越小,比方兩個數據中心之間通信要比同數據中心兩個機架之間通信要慢,兩個機架之間通信要比同機架的兩個節點通信慢。所以,依照由快到慢各自是: - 本機 - 同機架的兩個節點 - 同數據中心不同機架的兩個節點 - 不同數據中心的兩個節點 假如用d表示數據中心,r表示機架,n表示節點,那/d1/r1/n1就表示1數據中心1機架上的1號節點。 - distance(/d1/r1/n1, /d1/r1/n1) = 0 //同一臺機器 - distance(/d1/r1/n1,?/d1/r1/n2) = 2 //同一個機架上的兩臺機器,它們各自到共同父結點r1的步數都是1,因此距離是2 - distance(/d1/r1/n1,?/d1/r2/n3) = 4 //同數據中心的兩個機架 - distance(/d1/r1/n1,?/d2/r3/n4) = 6 //不同的數據中心最后,Hadoop是無法知道你的網絡拓撲結構的,所以你得通過配置告訴它。默認情況下,它覺得全部的節點都是同一個機架上的節點,也就是隨意兩臺之間的距離都是同樣的。在小規模的集群中,這個默認配置就夠用了,可是大的集群須要很多其它的配置,以后講到集群配置時再說。
寫文件剖析
文件是怎么被寫進HDFS的呢?以下的介紹可能過于仔細了,可是這樣才干有助于理解HDFS的數據一致性模型。 我們來討論一下創建新文件,向里寫數據,然后關閉文件的一個過程: 第一步:client調用DistributedFileSystem.create()方法創建一個文件; 第二步:DistributedFileSystem向namenode發起遠程方法調用(RPC),創建一個文件,可是namenode沒有把它關聯到不論什么block上去;namenode在這一步做了非常多檢查工作,保證該文件當前不存在,client有創建該文件的權限等。假設這些檢查都通過了,namenode創建一條新文件記錄,否則,創建失敗,client返回IOException。DistributedFileSystem返回一個FSDataOutputStream,像讀文件時一樣,這個FSDataOutputStream里包裝著一個DFSOutputStream,由它來實際處理與datanodes和namenode的通信。 第三步:client向DFSOutputStream里寫數據,DFSOutputStream把數據分成包,丟進一個稱為data queue的隊列中,DataStreamer負責向namenode申請新的block,新的block被分配在了n(默認3)個節點上,這3個節點就形成一個管道。 第四步:DataStreamer把data queue里的包拿出來通過管道輸送給第一個節點,第一個節點再通過管道輸送給第二個節點,第二個再輸送給第三個。 第五步:DFSOutputStream同一時候還在內部維護一個通知隊列,名叫ack queue,里面是發過的數據包,一個包僅僅有被全部管道上的datanodes通知收到了,才會被移除。假設隨意一個datanode接收失敗了,首先,管道關閉,然后把ack queue里的包都放回到data queue的頭部,以便讓失敗節點下游節點不會丟失這些數據。當前已經成功接收數據了的節點將會經與namenode協商后分配一個新的標識,以便當壞節點以后恢復回來時能夠把上面的不完整數據刪除。然后打開管道把壞節點移出,數據會繼續向其他好節點輸送,直到管道上的節點都完畢了,這時事實上是少復制了一個節點,向namenode報告一下說如今這個block沒有達到設定的副本數,然后就返回成功了,后期namenode會組織一個異步的任務,把副本數恢復到設定值。然后,接下來的數據包和數據塊正常寫入。以上操作,對client都是透明的,client不知道發生了這些事情,僅僅知道寫文件成功了。 假設多個datanodes都失敗了怎么辦呢?hdfs-site.xml里有個配置dfs.replication.min,默認值是1,意思是僅僅要有1個datanode接收成功,就覺得數據寫入成功了。client就會收到寫入成功的返回。后期Hadoop會發起異步任務把副本數恢復到dfs.replication設置的值(默認3)。 第六步:當client完畢數據寫入,調用流的close()方法,這個操作把data queue里的全部剩余的包都發給管道。 第七步:等全部包都收到了寫成功的反饋,client通知namenode寫文件完畢了。由于DataStream寫文件前就先向namenode申請block的位置信息了,所以寫文件完畢時,namenode早已知道每一個block都在哪了,它僅僅需等最小的副本數寫成功,就能夠返回成功了。Namenode怎樣選擇一個block被寫到哪幾個節點上去?
Hadoop在這個算法上是做了權衡處理的。都寫到同一個節點上,或者寫在同一個機架的節點上,肯定是效率最高的,由于傳輸數據的帶寬最大,但這就不是分布式冗余了,萬一這個節點失敗,或者這個機架掉電,這份數據就再也讀不到了。當然任意寫到三臺機器上,最好分在不同的數據中心才最安全,可是那樣又太損失效率了。即使是在同一個數據中心的節點上寫,也有非常多種選擇策略。從1.x版開始,這個策略就變成可插拔的了。 當前Hadoop默認的策略是: 第一份:假設client就執行在當前集群上,那第一個副本就存在當前節點上,假設client不執行在當前集群上,則隨機選擇第一個副本節點。當然這個隨機是會考慮不要選已經有了非常多數據或當前正在處理非常大流量的datanode的; 第二份:選擇與第一份不在同一個機架上的隨機一個節點; 第三份:選擇與第二份在同一個機架上的還有一個隨機節點; 很多其它份:假設須要復制很多其它份,其它節點是隨機選擇的,僅僅是盡量分布在多個機架上,不讓一個機架上有太多份副本。 *該書寫作時Hadoop不支持跨數據中心部署,如今的版本號不知道是不是去掉了這個限制,假設是,那這個策略是不是也會考慮跨數據中心,臨時還不清楚。 整體來看,這種策略平衡考慮了可靠性(數據分布在不同的機架上)、寫帶寬(僅僅有一次寫須要跨機架)、讀性能(讀數據時有兩上機架上的datanodes可選),數據分布在整個集群上。數據一致性模型
對于文件系統來講,所謂數據一致性模型,就是說一個寫文件操作寫進的數據,在什么時機能夠被其他讀文件的操作看到。HDFS在數據一致性方面做了平衡,因此可能不像本地文件系統那樣,寫進去的數據立即能夠讀到。 當一個文件被創建時,它是立即能夠被看到的,可是當數據寫進時,即使調用flush,讀文件的操作也未必能看到,這個文件的長度可能還是0。(前一節講文件寫入時的progress回調時,我曾做了實驗,一邊往里寫,非常次回調時睡1秒,然后還有一邊不停看文件寫進去多大了,結果發現一直是0,直到程序結束完畢了寫入,才看到文件的真實大小,當時以為是沒有flush,如今看來事實上是這樣的特殊的數據一致性模型導致的。) HDFS的數據一致性模型是以block為單位的,一個block被寫完了,會看到一個block的數據,沒寫完一個block,就看不到這個block的數據。block 1寫完了,其他讀操作能看到這個block的內容,這時block 2正在寫入,可是其他讀操作卻看不到,直到block 2完畢,開始寫block 3,block 2的數據才干夠被其他讀操作看到。 HDFS提供了一個方法,FSDataOutputStrean.sync(),強制讓當前已寫入的數據對其他讀操作可見。在1.x以后的版本號中,這個sync()方法被廢棄了,改用hflush(),另外另一個hsync()方法,聲明說是更強的保證數據一致性,但到寫書時為止,hsync()方法沒有被實現,僅僅是簡單地調用了hflush()而已。 關閉文件時會非顯示調用sync()方法,也就是被關閉了的文件,其所有數據都能夠被其他讀者看到了。這一數據一致性模型對于應用程序是有影響的。應用程序的開發人員應該心里清楚,當寫操作進行時假設讀數據,或者當client或系統出現故障時,可能會有最多一個block的數據丟失。假設你的應用不能接受,那就要彩取適當的策略在適當的時候調用hflush()方法,可是頻繁調用hflush會影響吞吐量,所以你要在程序健壯性和吞吐量雙方面做出權衡,選擇適當的調用hflush的頻率。
用Flume和Sqoop導入數據
敲代碼把數據放入HDFS,不如用已有的工具。由于如今已經有非常成熟的工具來完畢這件事,并且已經覆蓋了大部分的需求。 Flume是Apache的大量數據移動的一個工具。當中一個典型的應用就是把Flume部署在web server的機器上,把web server上的日志收集起來導入到HDFS。它同一時候也支持各種日志寫入。 Sqoop也是Apache的工具,它用于把大量結構化數據批量導入HDFS,比方把關系型數據庫里的數據導入到Hive里。Hive是執行在Hadoop上的數據倉庫,后面章節講到。轉載于:https://www.cnblogs.com/blfshiye/p/4280060.html
總結
以上是生活随笔為你收集整理的Hadoop HDFS (3) JAVA訪问HDFS之二 文件分布式读写策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Foundation框架: 9.OC中的
- 下一篇: Oracle 10G select工作原