日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce基础开发之六Map多输入

發布時間:2025/4/16 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce基础开发之六Map多输入 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在實際MapReduce開發中,會遇到需要數據多輸入路徑并有對應的Map函數來處理,這需要MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass)函數。


本文模擬從不同地市中獲取數據,并根據按照地市區號輸出記錄,具體見代碼。


import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DPIDigAdsl {public static class DPIDigMapperADSLGZ extends Mapper<Object, Text, Text, Text>{private Text oKey=new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String strKey=DPIUtil.ADSLMapHandle(value.toString(),"020");//廣州if(!strKey.isEmpty()){oKey.set(strKey);context.write(oKey,new Text(""));}}} public static class DPIDigMapperADSLFS extends Mapper<Object, Text, Text, Text>{private Text oKey=new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String strKey=DPIUtil.ADSLMapHandle(value.toString(),"0757");//佛山if(!strKey.isEmpty()){oKey.set(strKey);context.write(oKey,new Text(""));}}} public static class DPIDigReducer extends Reducer<Text,Text,Text,Text> {public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//設置驅動類Job job = new Job(conf, "DPI dig");job.setJarByClass(DPIDigAdsl.class);//多輸入路徑對應多map函數String inPathgz="/gz";String inPathfs="/fs";MultipleInputs.addInputPath(job, new Path(inPathgz), TextInputFormat.class, DPIDigMapperADSLGZ.class);MultipleInputs.addInputPath(job, new Path(inPathfs), TextInputFormat.class, DPIDigMapperADSLFS.class);//設置Reduce函數、輸出數據類型、輸出路徑job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setReducerClass(DPIDigReducer.class);job.setNumReduceTasks(1);//設置reduce輸入文件一個,方便查看結果String outPath="/tmp/fjs/dpi";outPath=DPIUtil.changeToDir(outPath)+"adsl";FileOutputFormat.setOutputPath(job, new Path(outPath));System.exit(job.waitForCompletion(true) ? 0 : 1);} } import java.net.MalformedURLException; import java.net.URL; import java.text.SimpleDateFormat; import java.util.Date; import java.util.regex.Matcher; import java.util.regex.Pattern;public class DPIUtil {//Map處理public static String ADSLMapHandle(String value,String cityCode){String strKey="";//返回//獲取字段值String[] strDPIs=value.split("\\|");//獲取行,并按照|分隔符提取if(strDPIs.length < 10 ) return strKey;//數據不合規,直接返回String date=DPIUtil.DatetimeToDate(strDPIs[0]);//上網時間STARTDATE提取出YYYYMMDD,如20160430String account=strDPIs[1];//acc_nbr或account String city=cityCode;//LATN_ID或city提取,如020、0755String url=strDPIs[7];//url提取,含域名和參數及值String domain=DPIUtil.hostFromUrl(strDPIs[7]);//Domain域名提取,如www.jd.com String cookie=strDPIs[9];//cookie提取,含域名和參數及值//定義正則表達式String[] regExs={".*.;^imei$;^\\d{15}$",".*.; ^meid$;^\\d{14}$|^\\d{16}$",".*.; ^imsi$;.*.",".*.; ^biz$;.*."};//匹配正則表達式for(String regEx:regExs){String regExDomain=regEx.split(";")[0];//域名正則表達式Pattern patDomain=Pattern.compile(regExDomain);if (domain==null) domain="";Matcher matDomain = patDomain.matcher(domain);if(matDomain.find()){//域名匹配String regExPara=regEx.split(";")[1];//參數正則表達式Pattern patPara=Pattern.compile(regExPara);String regExParaVal=regEx.split(";")[2];//參數值正則表達式Pattern patParaVal=Pattern.compile(regExParaVal);//解析URL和cookie,提取參數和值Pattern patSplit= Pattern.compile("[?&]+"); //以多條件分割字符串 String[] strSigns = patSplit.split(url+"?"+cookie);for (String strSign:strSigns){if(strSign.contains("=") && strSign.split("=").length>1){//para=value參數及其值提取String para=strSign.split("=")[0];//等號右邊參數if (para==null) para="";Matcher matPara = patPara.matcher(para);String paraVal=strSign.split("=")[1];//等號左邊參數值if (paraVal==null) paraVal="";Matcher matParaVal = patParaVal.matcher(paraVal);if(matPara.find() && matParaVal.find()){strKey=account+"|"+date+"|"+city+"|"+domain+"|"+para+"|"+paraVal;return strKey;} }} } }return strKey;}//時間戳轉日期時間public static String timestamp2date(String _timeStamp){String dateFormat = "yyyyMMddHHmmss";SimpleDateFormat fm = new SimpleDateFormat(dateFormat);if (_timeStamp.equals("")){return "";}try{long timeStamp = Long.parseLong(_timeStamp);String dt = fm.format(new Date(timeStamp*1000));return dt;} catch (Exception ex){return "";}}//截取時間日期字符串的前8位,輸出日期public static String DatetimeToDate(String _datetime){return _datetime.substring(0,8);//從第一個字符開始,共8個字符輸出}//從url中提取域名public static String hostFromUrl(String _url){String host=null;try {host = new URL(_url).getHost().toLowerCase();// 此處獲取值轉換為小寫} catch (MalformedURLException e) {// TODO Auto-generated catch blocke.printStackTrace();}return host;}//hadoop目錄規范/public static String changeToDir(String dirPath){//目錄最后是否有/if(dirPath.charAt(dirPath.length()-1)!='/'){dirPath = dirPath + "/";}return dirPath;}public static void main(String[] args) throws Exception { } }


總結

以上是生活随笔為你收集整理的MapReduce基础开发之六Map多输入的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。