hadoop API之:文件操作
生活随笔
收集整理的這篇文章主要介紹了
hadoop API之:文件操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
hadoop API之:文件操作
@(HADOOP)[hadoop, hadoop2]
- hadoop API之文件操作
- 1讀取文件
- 2文件復制
- 3獲取文件屬性
- 4列出某個目錄下的文件
- 5讀取sequencefile
- 6讀取HDFS文件
Hadoop提供了大量的API對文件系統中的文件進行操作,主要包括:
(1)讀取文件
(2)寫文件
(3)讀取文件屬性
(4)列出文件
(5)刪除文件
完整代碼見:https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-hadoop/src/main/java/com/lujinhong/commons/hadoop/fs
1、讀取文件
以下示例中,將hdfs中的一個文件讀取出來,并輸出到標準輸出流中。
package org.jediael.hadoopdemo.fsdemo;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils;public class FileSystemDoubleCat {public static void main(String[] args) throws IOException {String fileName = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(fileName), conf);FSDataInputStream in = null;try {in = fs.open(new Path(fileName));IOUtils.copyBytes(in, System.out, 4096, false);in.seek(0);IOUtils.copyBytes(in, System.out, 4096, false);} finally {in.close();}}}(1)其中FSDataInputStream實現了Seekable接口,可以對文件進行隨機定位,但注意,seek()的代價較高,如無必要,盡量少使用。
2、文件復制
package org.jediael.hadoopdemo.fsdemo;import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils;public class FileCopy {public static void main(String[] args) throws IOException {String sourceFile = args[0];String destFile = args[1];InputStream in = null;OutputStream out = null;try {//1、準備輸入流in = new BufferedInputStream(new FileInputStream(sourceFile));//2、準備輸出流Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(destFile), conf);out = fs.create(new Path(destFile));//3、復制IOUtils.copyBytes(in, out, 4096, false);} finally {in.close();out.close();}}}3、獲取文件屬性
文件屬性以FileStatus對象進行封裝,使用FileSystem對象的getFileStatus()方法,可以獲取到文件的FileStatus對象。
package org.jediael.hadoopdemo.fsdemo;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path;public class FileStatusDemo {public static void main(String[] args) throws IOException {String fileName = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(fileName), conf);//獲取FileSystem對象。FileStatus status = fs.getFileStatus(new Path(fileName));System.out.println(status.getOwner()+" "+status.getModificationTime());}}4、列出某個目錄下的文件
使用FileSystem的ListStatus方法,可以獲取到某個目錄下所有文件的FileStatus對象。
package org.jediael.hadoopdemo.fsdemo;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path;public class ListStatusDemo {public static void main(String[] args) throws IOException {String dir = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(dir), conf);FileStatus[] stats = fs.listStatus(new Path(dir));Path[] paths = FileUtil.stat2Paths(stats);for(Path path : paths){System.out.println(path);}}}遞歸列出目錄下的所有文件(2.0版本以后適用):
//遞歸列出目錄中的所有文件。 public static List<String> getAllHdfsFile(String dir) throws IOException {List<String> fileList = new ArrayList<>();Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(dir), conf);RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(dir), true);while (iterator.hasNext()) {LocatedFileStatus fileStatus = iterator.next();fileList.add(fileStatus.getPath().toString());}// for(String file : fileList){
// LOG.debug(file);
// }
return fileList;
5、讀取sequencefile
package com.lujinhong.commons.hadoop.fs;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException; import java.net.URI;/*** AUTHOR: LUJINHONG* CREATED ON: 17/1/11 11:32* PROJECT NAME: lujinhong-commons* DESCRIPTION:示范如何讀取以snappy格式壓縮的。雖然沒指定壓縮格式,但成功解壓了。*/ public class SequenceSnappyFileReader {public static void main(String[] args) throws IOException {String uri = args[0];Configuration conf = new Configuration();Path path = new Path(uri);SequenceFile.Reader reader = null;try {SequenceFile.Reader.Option filePath = SequenceFile.Reader.file(path);reader = new SequenceFile.Reader(conf, filePath);Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);//long position = reader.getPosition();while (reader.next(key, value)) {//同步記錄的邊界//String syncSeen = reader.syncSeen() ? "*" : "";//System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);System.out.println( value);//position = reader.getPosition(); // beginning of next record}} finally {IOUtils.closeStream(reader);}} }6、讀取HDFS文件
public static void main(String[] args) throws IOException {String fileName = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(fileName), conf);FSDataInputStream hdfsInStream = fs.open(new Path(fileName));String line;BufferedReader in =new BufferedReader(new InputStreamReader(hdfsInStream, "UTF-8"));while ((line = in.readLine()) != null) {} }總結
以上是生活随笔為你收集整理的hadoop API之:文件操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hadoop日常运维
- 下一篇: hadoop日志文件