日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

通过java api提交自定义hadoop 作业

發布時間:2023/12/19 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过java api提交自定义hadoop 作业 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

通過API操作之前要先了解幾個基本知識

一、hadoop的基本數據類型和java的基本數據類型是不一樣的,但是都存在對應的關系

如下圖


如果需要定義自己的數據類型,則必須實現Writable

hadoop的數據類型可以通過get方法獲得對應的java數據類型

而java的數據類型可以通過hadoop數據類名的構造函數,或者set方法轉換

二、hadoop提交作業的的步驟分為八個,可以理解為天龍八步

如下:

map端工作:

1.1 讀取要操作的文件--這步會將文件的內容格式化成鍵值對的形式,鍵為每一行的起始位置偏移,值為每一行的內容

1.2 調用map進行處理--在這步使用自定義的Mapper類來實現自己的邏輯,輸入的數據為1.1格式化的鍵值對,輸入的數據也是鍵值對的形式

1.3 對map的處理結果進行分區--map處理完畢之后可以根據自己的業務需求來對鍵值對進行分區處理,比如,將類型不同的結果保存在不同的文件中等。這里設置幾個分區,后面就會有對應的幾個Reducer來處理相應分區中的內容

1.4 分區之后,對每個分區的數據進行排序,分組--排序按照從小到大進行排列,排序完畢之后,會將鍵值對中,key相同的選項 的value進行合并。如,所有的鍵值對中,可能存在

hello 1

hello 1

key都是hello,進行合并之后變成

hello 2

可以根據自己的業務需求對排序和合并的處理進行干涉和實現

1.5 歸約(combiner)--簡單的說就是在map端進行一次reduce處理,但是和真正的reduce處理不同之處在于:combiner只能處理本地數據,不能跨網絡處理。通過map端的combiner處理可以減少輸出的數據,因為數據都是通過網絡傳輸的,其目的是為了減輕網絡傳輸的壓力和后邊reduce的工作量。并不能取代reduce

reduce端工作:

2.1 通過網絡將數據copy到各個reduce

2.2 調用reduce進行處理--reduce接收的數據是整個map端處理完畢之后的鍵值對,輸出的也是鍵值對的集合,是最終的結果

2.3 將結果輸出到hdfs文件系統的路徑中


新建一個java項目,并導入hadoop包,在項目選項上右鍵,如圖選擇


找到hadoop的安裝目錄,選擇所有的包


在找到hadoop安裝目錄下的lib,導入其中的所有包



新建JMapper類為自定義的Mapper類

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;//自定義的Mapper類必須繼承Mapper類,并重寫map方法實現自己的邏輯 public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {//處理輸入文件的每一行都會調用一次map方法,文件有多少行就會調用多少次protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key為每一行的起始偏移量//value為每一行的內容//每一行的內容分割,如hello world,分割成一個String數組有兩個數據,分別是hello,worldString[] ss = value.toString().toString().split("\t");//循環數組,將其中的每個數據當做輸出的鍵,值為1,表示這個鍵出現一次for (String s : ss) {//context.write方法可以將map得到的鍵值對輸出context.write(new Text(s), new LongWritable(1));}}; }
新建JReducer類為自定義的Reducer

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;//自定義的Reducer類必須繼承Reducer,并重寫reduce方法實現自己的邏輯,泛型參數分別為輸入的鍵類型,值類型;輸出的鍵類型,值類型;之后的reduce類似 public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {//處理每一個鍵值對都會調用一次reduce方法,有多少個鍵值對就調用多少次protected void reduce(Text key,java.lang.Iterable<LongWritable> value,org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key為每一個單獨的單詞,如:hello,world,you,me等//value為這個單詞在文本中出現的次數集合,如{1,1,1},表示總共出現了三次long sum = 0;//循環value,將其中的值相加,得到總次數for (LongWritable v : value) {sum += v.get();}//context.write輸入新的鍵值對(結果)context.write(key, new LongWritable(sum));}; }
新建執行提交作業的類,取名JSubmit

import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class JSubmit {public static void main(String[] args) throws IOException,URISyntaxException, InterruptedException, ClassNotFoundException {//Path類為hadoop API定義,創建兩個Path對象,一個輸入文件的路徑,一個輸入結果的路徑Path outPath = new Path("hdfs://localhost:9000/out");//輸入文件的路徑為本地linux系統的文件路徑Path inPath = new Path("/home/hadoop/word");//創建默認的Configuration對象Configuration conf = new Configuration();//根據地址和conf得到hadoop的文件系統獨享//如果輸入路徑已經存在則刪除FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}//根據conf創建一個新的Job對象,代表要提交的作業,作業名為JSubmit.class.getSimpleName()Job job = new Job(conf, JSubmit.class.getSimpleName());//1.1//FileInputFormat類設置要讀取的文件路徑FileInputFormat.setInputPaths(job, inPath);//setInputFormatClass設置讀取文件時使用的格式化類job.setInputFormatClass(TextInputFormat.class);//1.2調用自定義的Mapper類的map方法進行操作//設置處理的Mapper類job.setMapperClass(JMapper.class);//設置Mapper類處理完畢之后輸出的鍵值對 的 數據類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//1.3分區,下面的兩行代碼寫和沒寫都一樣,默認的設置<span style="white-space:pre"> </span>job.setPartitionerClass(HashPartitioner.class); <span style="white-space:pre"> </span>job.setNumReduceTasks(1);//1.4排序,分組//1.5歸約,這三步都有默認的設置,如果沒有特殊的需求可以不管 //2.1將數據傳輸到對應的Reducer//2.2使用自定義的Reducer類操作//設置Reducer類job.setReducerClass(JReducer.class);//設置Reducer處理完之后 輸出的鍵值對 的數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//2.3將結果輸出//FileOutputFormat設置輸出的路徑FileOutputFormat.setOutputPath(job, outPath);//setOutputFormatClass設置輸出時的格式化類job.setOutputFormatClass(TextOutputFormat.class);//將當前的job對象提交job.waitForCompletion(true);} }
運行java程序,可以再控制臺看到提交作業的提示


在hdfs中查看輸出的文件


運行成功!


轉載于:https://www.cnblogs.com/jchubby/p/4429702.html

總結

以上是生活随笔為你收集整理的通过java api提交自定义hadoop 作业的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。