Hadoop入门(十一)Mapreduce的InputFomrat各种子类
一、TextInputFormat
extends FileInputFomrat<LongWritable,Text>? 是默認(rèn)讀取文件的切分器,其內(nèi)的LineRecordReader:用來讀取每一行的內(nèi)容,
? LineRecordReader:內(nèi)的 nextKeyValue(){}中,key的賦值在:
? initialize()方法內(nèi), key=start=split.getStart();?? split假如對(duì)應(yīng)文件 hello.txt 期內(nèi)為hello you? hello me
? 那么起始位置就是0
? end = start + split.getLength(),
? 而行文本在方法 讀取到的行字節(jié)長(zhǎng)度=readLine(...)中讀取,對(duì)應(yīng)到LineReader.readLine(...) 170行
? string key = getCurrentKey()?? string value = getCurrentValue() 中得到
? 然后在Mapper類中:
?while(LineRecordReader.nextKeyValue()){key = linerecordreader.getCurrentKey()'value = linerecordreader.getCurrentValue()map.(key,value,context); //不停的將鍵值對(duì)寫出去 }?
二、DBInputFormat
? DBInputFormat 在讀取數(shù)據(jù)時(shí),產(chǎn)生的鍵值對(duì)是 <LongWritable,DBWritable的實(shí)例>
??? LongWritable仍舊是偏移量,
? 可以參看 org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue()/232行,如下
?key.set(pos + split.getStart());?? 來確認(rèn) 表示的仍舊是偏移量
package inputformat;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;import mapreduce.MyWordCount;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 目的: 將mysql/test庫/myuser表中將字段id,name對(duì)應(yīng)的屬性通過 mapreduce(下面例子僅是通過map 沒有reduce操作)將記錄寫出到hdfs中* mysql--->map--->hdfs* 要運(yùn)行本示例* 1.把mysql的jdbc驅(qū)動(dòng)放到各TaskTracker節(jié)點(diǎn)的hadoop/mapreduce/lib目錄下* 2.重啟集群*/ public class MyDBInputFormatApp {private static final String OUT_PATH = "hdfs://hadoop0:9000/out";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 連接數(shù)據(jù)庫 代碼盡量考前寫 寫在后面執(zhí)行會(huì)報(bào)錯(cuò) DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop0:3306/test", "root", "admin");final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);if (filesystem.exists(new Path(OUT_PATH))) {filesystem.delete(new Path(OUT_PATH), true);}final Job job = new Job(conf, MyDBInputFormatApp.class.getSimpleName()); // 創(chuàng)建job job.setJarByClass(MyDBInputFormatApp.class);job.setInputFormatClass(DBInputFormat.class);// 指定inputsplit具體實(shí)現(xiàn)類 // 下面方法參數(shù)屬性為: 操作javabean, 對(duì)應(yīng)表名, 查詢條件,排序要求,需要查詢的表字段 DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id", "name");// // 設(shè)置map類和map處理的 key value 對(duì)應(yīng)的數(shù)據(jù)類型 job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(0); //指定不需要使用reduce,直接把map輸出寫入到HDFS中 job.setOutputKeyClass(Text.class); // 設(shè)置job output key 輸出類型 job.setOutputValueClass(NullWritable.class);// 設(shè)置job output value 輸出類型 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.waitForCompletion(true);}//<k1,v1>對(duì)應(yīng)的是數(shù)據(jù)庫對(duì)應(yīng)表下記錄位置,和這行對(duì)應(yīng)的JavaBean, <k2,v2>表示經(jīng)過map處理好輸出結(jié)果 public static class MyMapper extends Mapper<LongWritable, MyUser, Text, NullWritable> {protected void map(LongWritable key, MyUser value, Context context) throws java.io.IOException, InterruptedException {context.write(new Text(value.toString()), NullWritable.get());};}/*** Writable是為了在Hadoop各節(jié)點(diǎn)之間傳輸使用的,因此需要實(shí)例化* DBWritable表示和數(shù)據(jù)庫傳輸時(shí)使用的** @author zm*/public static class MyUser implements Writable, DBWritable {int id;String name;// 針對(duì)Writable 需要重寫的方法 @Overridepublic void write(DataOutput out) throws IOException {out.writeInt(id);Text.writeString(out, name);}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readInt();this.name = Text.readString(in);}// 針對(duì)DBWritable需要重寫的方法 @Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setInt(1, id);statement.setString(2, name);}@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.id = resultSet.getInt(1);this.name = resultSet.getString(2);}@Overridepublic String toString() {return id + "\t" + name;}} }?
三、NLineInputFormat
?這種格式下,split的數(shù)量就不是由文件對(duì)應(yīng)block塊個(gè)數(shù)決定的, 而是由設(shè)置處理多少行決定,
? 比如一個(gè)文件 100行, 設(shè)置NlineInputFormat 處理2行,那么會(huì)產(chǎn)生50個(gè)map任務(wù), 每個(gè)map任務(wù)
? 仍舊一行行的處理 會(huì)調(diào)用2次map函數(shù)、
package inputformat;import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** TextInputFormat處理的數(shù)據(jù)來自于一個(gè)InputSplit。InputSplit是根據(jù)大小劃分的。* NLineInputFormat決定每個(gè)Mapper處理的記錄數(shù)是相同的。* 設(shè)置map處理行數(shù)多,則需要產(chǎn)生的map個(gè)數(shù)就會(huì)減少*/ public class MyNLineInputFormatApp {private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello";private static final String OUT_PATH = "hdfs://hadoop0:9000/out";public static void main(String[] args) throws Exception {// 定義conf Configuration conf = new Configuration();//設(shè)置每個(gè)map可以處理多少條記錄,默認(rèn)是1行,這里設(shè)置為每個(gè)map處理的記錄數(shù)都是2個(gè) conf.setInt("mapreduce.input.lineinputformat.linespermap", 2);final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);if (filesystem.exists(new Path(OUT_PATH))) {filesystem.delete(new Path(OUT_PATH), true);}// 定義job final Job job = new Job(conf, MyNLineInputFormatApp.class.getSimpleName());job.setJarByClass(MyNLineInputFormatApp.class);// 定義 inputformat要處理的文件位置和具體處理實(shí)現(xiàn)類 FileInputFormat.setInputPaths(job, INPUT_PATH);job.setInputFormatClass(NLineInputFormat.class);// 設(shè)置map job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 設(shè)置reduce job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 設(shè)置處理最終結(jié)果輸出路徑 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.waitForCompletion(true);}public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {//解析源文件會(huì)產(chǎn)生2個(gè)鍵值對(duì),分別是<0,hello you><10,hello me>;所以map函數(shù)會(huì)被調(diào)用2次 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {//為什么要把hadoop類型轉(zhuǎn)換為java類型? final String line = value.toString();final String[] splited = line.split("\t");//產(chǎn)生的<k,v>對(duì)少了 for (String word : splited) {//在for循環(huán)體內(nèi),臨時(shí)變量word的出現(xiàn)次數(shù)是常量1 context.write(new Text(word), new LongWritable(1));}};}//map函數(shù)執(zhí)行結(jié)束后,map輸出的<k,v>一共有4個(gè),分別是<hello,1><you,1><hello,1><me,1> //分區(qū),默認(rèn)只有一個(gè)區(qū) //排序后的結(jié)果:<hello,1><hello,1><me,1><you,1> //分組后的結(jié)果:<hello,{1,1}> <me,{1}> <you,{1}> //歸約(可選) //map產(chǎn)生的<k,v>分發(fā)到reduce的過程稱作shuffle public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {//每一組調(diào)用一次reduce函數(shù),一共調(diào)用了3次 //分組的數(shù)量與reduce函數(shù)的調(diào)用次數(shù)有什么關(guān)系? //reduce函數(shù)的調(diào)用次數(shù)與輸出的<k,v>的數(shù)量有什么關(guān)系? protected void reduce(Text key, java.lang.Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {//count表示單詞key在整個(gè)文件中的出現(xiàn)次數(shù) long count = 0L;for (LongWritable times : values) {count += times.get();}context.write(key, new LongWritable(count));};} }?
四、KeyValueInputFormat
?如果行中有分隔符,那么分隔符前面的作為key,后面的作為value
?如果行中沒有分隔符,那么整行作為key,value為空
?默認(rèn)分隔符為 \t
package inputformat;import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 以hello文件內(nèi)容為如下為例:* hello you* hello me* <p>* 特點(diǎn)是:* Each line is divided into key and value parts by a separator byte. If no* such a byte exists, the key will be the entire line and value will be empty* 通過分隔符將每一行切分 切分后結(jié)果分別作為key value* 如果沒有分隔符,那么正一行就作為key 值為null* 如果一行中有多個(gè)制表符的話,會(huì)取第一個(gè)作為key 剩余作為value,后面的也不會(huì)再分割了* <p>* KeyValueInputForamt他用特定分隔符分割來形成自己的key value,看源碼(KeyValueLineRecordReader下為\t)默制默認(rèn)分隔符為制表符* <p>* 輸出結(jié)果為:* hello 1* you 1* helllo 1* me 1*/ public class MyKeyValueTextInputFormatApp {private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello";private static final String OUT_PATH = "hdfs://hadoop0:9000/out";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);if (filesystem.exists(new Path(OUT_PATH))) {filesystem.delete(new Path(OUT_PATH), true);}// 創(chuàng)建job final Job job = new Job(conf, MyKeyValueTextInputFormatApp.class.getSimpleName());job.setJarByClass(MyKeyValueTextInputFormatApp.class);// 設(shè)置InputFormat處理文件路徑和具體操作實(shí)體類 FileInputFormat.setInputPaths(job, INPUT_PATH);job.setInputFormatClass(KeyValueTextInputFormat.class);// 設(shè)置map job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 設(shè)置reduce 這里reduce設(shè)置為0 job.setNumReduceTasks(0);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 設(shè)置最終結(jié)果輸出路徑 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.waitForCompletion(true);}public static class MyMapper extends Mapper<Text, Text, Text, LongWritable> {protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {context.write(key, new LongWritable(1));context.write(value, new LongWritable(1));}} }GenericWritable
適用于 不同輸入源下,多map輸出類型不同
package inputformat;import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.GenericWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** MyMapper, MyMapper2的 v2輸出類型一個(gè)是longWritable,一個(gè)是String, 兩者需要統(tǒng)一成一個(gè)輸出類型,* 以方便job在設(shè)置v2類型----> job.setMapOutputValueClass(MyGenericWritable.class)* <p>* 文件hello 內(nèi)容為:* hello you* hello me* <p>* 文件hello2 內(nèi)容為:* hello,you* hello,me** @author zm* <p>* <p>* 結(jié)果:* [root@master hadoop]# hadoop fs -text /out/part-r-00000* Warning: $HADOOP_HOME is deprecated.* <p>* hello 4* me 2* you 2*/ public class MyGenericWritableApp {private static final String OUT_PATH = "hdfs://master:9000/out";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);if (filesystem.exists(new Path(OUT_PATH))) {filesystem.delete(new Path(OUT_PATH), true);}final Job job = new Job(conf, MyGenericWritableApp.class.getSimpleName());job.setJarByClass(MyGenericWritableApp.class);// 設(shè)置每種輸入文件的位置 具體切分文件類 和對(duì)應(yīng)的處理map類 MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/hello"), KeyValueTextInputFormat.class, MyMapper.class);MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/hello2"), TextInputFormat.class, MyMapper2.class);// 設(shè)置map //job.setMapperClass(MyMapper.class); //不應(yīng)該有這一行 上面已經(jīng)設(shè)置好了map類 job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(MyGenericWritable.class);// 設(shè)置reduce job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 設(shè)置輸出結(jié)果存放路徑 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.waitForCompletion(true);}public static class MyMapper extends Mapper<Text, Text, Text, MyGenericWritable> {//解析源文件會(huì)產(chǎn)生2個(gè)鍵值對(duì),分別是<hello,you> <hello,me>;所以map函數(shù)會(huì)被調(diào)用2次 // 處理后結(jié)果為: <hello,(MyGenericWritable(1),MyGenericWritable(1))> <you,(MyGenericWritable(1))> <me,(MyGenericWritable(1))> protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, MyGenericWritable>.Context context) throws java.io.IOException, InterruptedException {context.write(key, new MyGenericWritable(new LongWritable(1)));context.write(value, new MyGenericWritable(new LongWritable(1)));};}public static class MyMapper2 extends Mapper<LongWritable, Text, Text, MyGenericWritable> {//解析源文件會(huì)產(chǎn)生2個(gè)鍵值對(duì),分別是<0,(hello,you)><10,(hello,me)>;鍵值對(duì)內(nèi)的()是我自己加上去的為了便于和前面偏移量的,區(qū)分開來 所以map函數(shù)會(huì)被調(diào)用2次 // 處理后結(jié)果為: <hello,(MyGenericWritable("1"),MyGenericWritable("1"))> <you,(MyGenericWritable("1"))> <me,(MyGenericWritable("1"))> protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, MyGenericWritable>.Context context) throws java.io.IOException, InterruptedException {//為什么要把hadoop類型轉(zhuǎn)換為java類型? final String line = value.toString();final String[] splited = line.split(",");//產(chǎn)生的<k,v>對(duì)少了 for (String word : splited) {System.out.println("MyMapper2 word is:" + word);//在for循環(huán)體內(nèi),臨時(shí)變量word的出現(xiàn)次數(shù)是常量1 final Text text = new Text("1");context.write(new Text(word), new MyGenericWritable(text));}};}//map產(chǎn)生的<k,v>分發(fā)到reduce的過程稱作shuffle public static class MyReducer extends Reducer<Text, MyGenericWritable, Text, LongWritable> {//每一組調(diào)用一次reduce函數(shù),一共調(diào)用了3次 //分組的數(shù)量與reduce函數(shù)的調(diào)用次數(shù)有什么關(guān)系? //reduce函數(shù)的調(diào)用次數(shù)與輸出的<k,v>的數(shù)量有什么關(guān)系? protected void reduce(Text key, java.lang.Iterable<MyGenericWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, MyGenericWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {//count表示單詞key在整個(gè)文件中的出現(xiàn)次數(shù) long count = 0L;for (MyGenericWritable times : values) {final Writable writable = times.get();if (writable instanceof LongWritable) {count += ((LongWritable) writable).get();}if (writable instanceof Text) {count += Long.parseLong(((Text) writable).toString());}}context.write(key, new LongWritable(count));};}/*** @author zm*/public static class MyGenericWritable extends GenericWritable {public MyGenericWritable() {}public MyGenericWritable(Text text) {super.set(text);}public MyGenericWritable(LongWritable longWritable) {super.set(longWritable);}// 數(shù)組里面存放要處理的類型 @Overrideprotected Class<? extends Writable>[] getTypes() {return new Class[]{LongWritable.class, Text.class};}} }?
五、CombineTextInputFormat
將輸入源目錄下多個(gè)小文件 合并成一個(gè)文件(split)來交給mapreduce處理 這樣只會(huì)生成一個(gè)map任務(wù)
比如用戶給的文件全都是10K那種的文件, 其內(nèi)部也是用的TextInputFormat 當(dāng)合并大小大于(64M)128M的時(shí)候,
也會(huì)產(chǎn)生對(duì)應(yīng)個(gè)數(shù)的split
SequenceFile
?也是合并還沒明白和CombineTextInputFormat的區(qū)別在哪里:
import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection;import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text;public class SequenceFileMore {public static void main(String[] args) throws IOException, URISyntaxException {final Configuration conf = new Configuration();final FileSystem fs = FileSystem.get(new URI("hdfs://h2single:9000/"), conf);Path path = new Path("/sf_logs");//寫操作 final Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, BytesWritable.class);// false表示不迭代子目錄 Collection<File> listFiles = FileUtils.listFiles(new File("/usr/local/logs"), new String[]{"log"}, false);for (File file : listFiles) { // 將/usr/local/logs下的所有.log文件 以對(duì)應(yīng)文件文件名為key 對(duì)應(yīng)文件內(nèi)容字節(jié)數(shù)組為value 共同寫入到/sf_logs內(nèi) String fileName = file.getName();Text key = new Text(fileName);byte[] bytes = FileUtils.readFileToByteArray(file);BytesWritable value = new BytesWritable(bytes);writer.append(key, value);}IOUtils.closeStream(writer);//讀操作 final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);final Text key = new Text();final BytesWritable val = new BytesWritable();while (reader.next(key, val)) {String fileName = "/usr/local/logs_bak/" + key.toString();File file = new File(fileName);FileUtils.writeByteArrayToFile(file, val.getBytes());}IOUtils.closeStream(reader);}}?
MultipleInputs
對(duì)應(yīng)于 多個(gè)文件處理類型下 比如又要處理數(shù)據(jù)庫的文件 同時(shí)又要處理小文件
這里僅將main函數(shù)拼接展示下,各自對(duì)應(yīng)的mapper類自己去寫:
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(十一)Mapreduce的InputFomrat各种子类的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 宁德时代三季度出货 100GWh,其中动
- 下一篇: 依存句法分析的任务以及形式化定义