日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

(转载)Nutch 2.0 之 抓取流程简单分析

發布時間:2025/4/16 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 (转载)Nutch 2.0 之 抓取流程简单分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Nutch 2.0 抓取流程介紹
---------------------


1. 整體流程


InjectorJob => GeneratorJob => FetcherJob => ParserJob => DbUpdaterJob => SolrIndexerJob


InjectorJob : 從文件中得到一批種子網頁,把它們放到抓取數據庫中去
GeneratorJob: 從抓取數據庫中產生要抓取的頁面放到抓取隊列中去
FetcherJob: ? 對抓取隊列中的網頁進行抓取,在reducer中使用了生產/消費者模型
ParserJob: ? ?對抓取完成的網頁進行解析,產生一些新的鏈接與網頁內容的解析結果
DbUpdaterJob: 把新產生的鏈接更新到抓取數據庫中去
SolrIndexerJob: 對解析后的內容進行索引建立


2. InjectorJob分析



? 下面是InjectorJob的啟動函數,代碼如下

[java]?view plain?copy
  • ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ???getConf().setLong("injector.current.time",?System.currentTimeMillis());??
  • ???Path?input;??
  • ???Object?path?=?args.get(Nutch.ARG_SEEDDIR);??
  • ???if?(path?instanceof?Path)?{??
  • ?????input?=?(Path)path;??
  • ???}?else?{??
  • ?????input?=?new?Path(path.toString());??
  • ???}??
  • ???numJobs?=?2;??
  • ???currentJobNum?=?0;??
  • ???status.put(Nutch.STAT_PHASE,?"convert?input");??
  • ???currentJob?=?new?NutchJob(getConf(),?"inject-p1?"?+?input);??
  • ???FileInputFormat.addInputPath(currentJob,?input);??
  • //?mapper方法,從文件中解析出url,寫入數據庫??
  • ???currentJob.setMapperClass(UrlMapper.class);??
  • ???currentJob.setMapOutputKeyClass(String.class);??
  • //?map?的輸出為WebPage,它是用Gora?compile生成的,可以通過Gora把它映射到不同的數據庫中,??
  • ???currentJob.setMapOutputValueClass(WebPage.class);??
  • //?輸出到GoraOutputFormat??
  • ???currentJob.setOutputFormatClass(GoraOutputFormat.class);??
  • ???DataStore<String,?WebPage>?store?=?StorageUtils.createWebStore(currentJob.getConfiguration(),??
  • ???????String.class,?WebPage.class);??
  • ???GoraOutputFormat.setOutput(currentJob,?store,?true);??
  • ???currentJob.setReducerClass(Reducer.class);??
  • ???currentJob.setNumReduceTasks(0);??
  • ???currentJob.waitForCompletion(true);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???currentJob?=?null;??
  • ??
  • ??
  • ???status.put(Nutch.STAT_PHASE,?"merge?input?with?db");??
  • ???status.put(Nutch.STAT_PROGRESS,?0.5f);??
  • ???currentJobNum?=?1;??
  • ???currentJob?=?new?NutchJob(getConf(),?"inject-p2?"?+?input);??
  • ???StorageUtils.initMapperJob(currentJob,?FIELDS,?String.class,??
  • ???????WebPage.class,?InjectorMapper.class);??
  • ???currentJob.setNumReduceTasks(0);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???status.put(Nutch.STAT_PROGRESS,?1.0f);??
  • ???return?results;??
  • ?}??

  • ? ?
    ? ?因為InjectorJob擴展自NutchTool,實現了它的run方法。


    ? ?我們可以看到,這里有兩個MR任務,第一個主要是從文件中讀入種子網頁,寫到DataStore數據庫中,第二個MR任務主要是對數據庫中的WebPage對象做一個分數與抓取間隔的設置。它使用到一個initMapperJob方法,代碼如下

    [java]?view plain?copy
  • public?static?<K,?V>?void?initMapperJob(Job?job,??
  • ????Collection<WebPage.Field>?fields,??
  • ????Class<K>?outKeyClass,?Class<V>?outValueClass,??
  • ????Class<??extends?GoraMapper<String,?WebPage,?K,?V>>?mapperClass,??
  • ????Class<??extends?Partitioner<K,?V>>?partitionerClass,?boolean?reuseObjects)??
  • throws?ClassNotFoundException,?IOException?{??
  • ?//?這里是生成一個DataStore的抽象,這里的DataStore用戶可以不同的模塊,如Hbase,MySql等??
  • ??DataStore<String,?WebPage>?store?=?createWebStore(job.getConfiguration(),??
  • ??????String.class,?WebPage.class);??
  • ??if?(store==null)?throw?new?RuntimeException("Could?not?create?datastore");??
  • ??Query<String,?WebPage>?query?=?store.newQuery();??
  • ??query.setFields(toStringArray(fields));??
  • ??GoraMapper.initMapperJob(job,?query,?store,??
  • ??????outKeyClass,?outValueClass,?mapperClass,?partitionerClass,?reuseObjects);??
  • ??GoraOutputFormat.setOutput(job,?store,?true);??
  • }??

  • ? ?

    3. GeneratorJob 源代碼分析



    ? ?下面是GeneratorJob的run方法代碼

    [java]?view plain?copy
  • ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ???//?map?to?inverted?subset?due?for?fetch,?sort?by?score??
  • ???Long?topN?=?(Long)args.get(Nutch.ARG_TOPN);??
  • ???Long?curTime?=?(Long)args.get(Nutch.ARG_CURTIME);??
  • ???if?(curTime?==?null)?{??
  • ?????curTime?=?System.currentTimeMillis();??
  • ???}??
  • ???Boolean?filter?=?(Boolean)args.get(Nutch.ARG_FILTER);??
  • ???Boolean?norm?=?(Boolean)args.get(Nutch.ARG_NORMALIZE);??
  • ???//?map?to?inverted?subset?due?for?fetch,?sort?by?score??
  • ???getConf().setLong(GENERATOR_CUR_TIME,?curTime);??
  • ???if?(topN?!=?null)??
  • ?????getConf().setLong(GENERATOR_TOP_N,?topN);??
  • ???if?(filter?!=?null)??
  • ?????getConf().setBoolean(GENERATOR_FILTER,?filter);??
  • ???int?randomSeed?=?Math.abs(new?Random().nextInt());??
  • ???batchId?=?(curTime?/?1000)?+?"-"?+?randomSeed;??
  • ???getConf().setInt(GENERATOR_RANDOM_SEED,?randomSeed);??
  • ???getConf().set(BATCH_ID,?batchId);??
  • ???getConf().setLong(Nutch.GENERATE_TIME_KEY,?System.currentTimeMillis());??
  • ???if?(norm?!=?null)??
  • ?????getConf().setBoolean(GENERATOR_NORMALISE,?norm);??
  • ???String?mode?=?getConf().get(GENERATOR_COUNT_MODE,?GENERATOR_COUNT_VALUE_HOST);??
  • ???if?(GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode))?{??
  • ?????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_HOST);??
  • ???}?else?if?(GENERATOR_COUNT_VALUE_DOMAIN.equalsIgnoreCase(mode))?{??
  • ???????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_DOMAIN);??
  • ???}?else?{??
  • ?????LOG.warn("Unknown?generator.max.count?mode?'"?+?mode?+?"',?using?mode="?+?GENERATOR_COUNT_VALUE_HOST);??
  • ?????getConf().set(GENERATOR_COUNT_MODE,?GENERATOR_COUNT_VALUE_HOST);??
  • ?????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_HOST);??
  • ???}??
  • ??
  • ??
  • //?上面是設置一些要使用要的常量??
  • ???numJobs?=?1;??
  • ???currentJobNum?=?0;??
  • //?生成一個job??
  • ???currentJob?=?new?NutchJob(getConf(),?"generate:?"?+?batchId);??
  • //?初始化Map,這里的Map的輸出類型為<SelectorEntry,WebPage>,?使用?SelectorEntryPartitioner來進行切分??
  • ???StorageUtils.initMapperJob(currentJob,?FIELDS,?SelectorEntry.class,??
  • ???????WebPage.class,?GeneratorMapper.class,?SelectorEntryPartitioner.class,?true);??
  • //?初始化Reducer,?使用了generatorReducer來進行聚合處理??
  • ???StorageUtils.initReducerJob(currentJob,?GeneratorReducer.class);??
  • ???currentJob.waitForCompletion(true);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???results.put(BATCH_ID,?batchId);??
  • ???return?results;??
  • ?}??
  • ????


  • ? 好像比原來的Generate簡單很多,這里的GeneratorMapper完成的工作與之前的版本是一樣的,如url的正規化,過濾,分數的設置,而GeneratorReducer完成的工作也和之前差不多,只是輸出變成了DataStore,如HBase,完成以后會每個WebPage進行打標記,表示當前WebPage所完成的一個狀態。




    4. FetcherJob 源代碼分析



    ? ?使用了Gora的 fetcher比原來簡單了很多,下面是其run的源代碼

    [java]?view plain?copy
  • ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ???checkConfiguration();??
  • ???String?batchId?=?(String)args.get(Nutch.ARG_BATCH);??
  • ???Integer?threads?=?(Integer)args.get(Nutch.ARG_THREADS);??
  • ???Boolean?shouldResume?=?(Boolean)args.get(Nutch.ARG_RESUME);??
  • ???Integer?numTasks?=?(Integer)args.get(Nutch.ARG_NUMTASKS);??
  • ??
  • ???if?(threads?!=?null?&&?threads?>?0)?{??
  • ?????getConf().setInt(THREADS_KEY,?threads);??
  • ???}??
  • ???if?(batchId?==?null)?{??
  • ?????batchId?=?Nutch.ALL_BATCH_ID_STR;??
  • ???}??
  • ???getConf().set(GeneratorJob.BATCH_ID,?batchId);??
  • ???if?(shouldResume?!=?null)?{??
  • ?????getConf().setBoolean(RESUME_KEY,?shouldResume);??
  • ???}??
  • ?????
  • ???LOG.info("FetcherJob?:?timelimit?set?for?:?"?+?getConf().getLong("fetcher.timelimit",?-1));??
  • ???LOG.info("FetcherJob:?threads:?"?+?getConf().getInt(THREADS_KEY,?10));??
  • ???LOG.info("FetcherJob:?parsing:?"?+?getConf().getBoolean(PARSE_KEY,?false));??
  • ???LOG.info("FetcherJob:?resuming:?"?+?getConf().getBoolean(RESUME_KEY,?false));??
  • ??
  • ??
  • ???//?set?the?actual?time?for?the?timelimit?relative??
  • ???//?to?the?beginning?of?the?whole?job?and?not?of?a?specific?task??
  • ???//?otherwise?it?keeps?trying?again?if?a?task?fails??
  • ???long?timelimit?=?getConf().getLong("fetcher.timelimit.mins",?-1);??
  • ???if?(timelimit?!=?-1)?{??
  • ?????timelimit?=?System.currentTimeMillis()?+?(timelimit?*?60?*?1000);??
  • ?????getConf().setLong("fetcher.timelimit",?timelimit);??
  • ???}??
  • ???numJobs?=?1;??
  • ???currentJob?=?new?NutchJob(getConf(),?"fetch");??
  • //?得到它過濾的字段??
  • ???Collection<WebPage.Field>?fields?=?getFields(currentJob);??
  • //?初始化mapper,?其輸出為<IntWritable,FetchEntry>??
  • //?在mapper中輸入數據進行過濾,主要是對不是同一個batch與已經fetch的數據進行過濾??
  • ???StorageUtils.initMapperJob(currentJob,?fields,?IntWritable.class,??
  • ???????FetchEntry.class,?FetcherMapper.class,?FetchEntryPartitioner.class,?false);??
  • //?初始化reducer??
  • ???StorageUtils.initReducerJob(currentJob,?FetcherReducer.class);??
  • ???if?(numTasks?==?null?||?numTasks?<?1)?{??
  • ?????currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",??
  • ?????????currentJob.getNumReduceTasks()));??
  • ???}?else?{??
  • ?????currentJob.setNumReduceTasks(numTasks);??
  • ???}??
  • ???currentJob.waitForCompletion(true);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???return?results;??
  • ?}??


  • ? 這里把原來在Mapper中使用到的生產者與消費者模型用到了reducer中,重寫了reducer的run方法,在其中打開多個抓取線程,對url進行多線程抓取,有興趣可以看一下FetcherReducer這個類。




    5. ParserJob 代碼分析



    ? ?下面是ParserJob.java中的run代碼

    [java]?view plain?copy
  • ?@Override??
  • ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ???String?batchId?=?(String)args.get(Nutch.ARG_BATCH);??
  • ???Boolean?shouldResume?=?(Boolean)args.get(Nutch.ARG_RESUME);??
  • ???Boolean?force?=?(Boolean)args.get(Nutch.ARG_FORCE);??
  • ?????
  • ???if?(batchId?!=?null)?{??
  • ?????getConf().set(GeneratorJob.BATCH_ID,?batchId);??
  • ???}??
  • ???if?(shouldResume?!=?null)?{??
  • ?????getConf().setBoolean(RESUME_KEY,?shouldResume);??
  • ???}??
  • ???if?(force?!=?null)?{??
  • ?????getConf().setBoolean(FORCE_KEY,?force);??
  • ???}??
  • ???LOG.info("ParserJob:?resuming:\t"?+?getConf().getBoolean(RESUME_KEY,?false));??
  • ???LOG.info("ParserJob:?forced?reparse:\t"?+?getConf().getBoolean(FORCE_KEY,?false));??
  • ???if?(batchId?==?null?||?batchId.equals(Nutch.ALL_BATCH_ID_STR))?{??
  • ?????LOG.info("ParserJob:?parsing?all");??
  • ???}?else?{??
  • ?????LOG.info("ParserJob:?batchId:\t"?+?batchId);??
  • ???}??
  • ???currentJob?=?new?NutchJob(getConf(),?"parse");??
  • ?????
  • ???Collection<WebPage.Field>?fields?=?getFields(currentJob);??
  • //?初始化mapper,輸出類型為<String,WebPage>,?解析全部在maper完成??
  • ???StorageUtils.initMapperJob(currentJob,?fields,?String.class,?WebPage.class,??
  • ???????ParserMapper.class);??
  • //?初始化reducer,這里是支持把<key,values>寫到數據庫中??
  • ???StorageUtils.initReducerJob(currentJob,?IdentityPageReducer.class);??
  • ???currentJob.setNumReduceTasks(0);??
  • ??
  • ??
  • ???currentJob.waitForCompletion(true);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???return?results;??
  • ?}??


  • ? ?

    6. DbUpdaterJob 代碼分析


    下面是DbUpdaterjob的run方法代碼

    [java]?view plain?copy
  • ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ???String?crawlId?=?(String)args.get(Nutch.ARG_CRAWL);??
  • ???numJobs?=?1;??
  • ???currentJobNum?=?0;??
  • ???currentJob?=?new?NutchJob(getConf(),?"update-table");??
  • ???if?(crawlId?!=?null)?{??
  • ?????currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY,?crawlId);??
  • ???}??
  • ???//job.setBoolean(ALL,?updateAll);??
  • ???ScoringFilters?scoringFilters?=?new?ScoringFilters(getConf());??
  • ???HashSet<WebPage.Field>?fields?=?new?HashSet<WebPage.Field>(FIELDS);??
  • ???fields.addAll(scoringFilters.getFields());??
  • ?????
  • ???//?Partition?by?{url},?sort?by?{url,score}?and?group?by?{url}.??
  • ???//?This?ensures?that?the?inlinks?are?sorted?by?score?when?they?enter??
  • ???//?the?reducer.??
  • ?????
  • ???currentJob.setPartitionerClass(UrlOnlyPartitioner.class);??
  • ???currentJob.setSortComparatorClass(UrlScoreComparator.class);??
  • ???currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);??
  • ?????
  • //?這里的maper讀取webpage中的outlinks字段值,對每個外鏈接計算分數??
  • ???StorageUtils.initMapperJob(currentJob,?fields,?UrlWithScore.class,??
  • ???????NutchWritable.class,?DbUpdateMapper.class);??
  • //?對新生成的外鏈接設置一些分數,狀態等信息,再把新的WebPage寫回數據庫??
  • ???StorageUtils.initReducerJob(currentJob,?DbUpdateReducer.class);??
  • ???currentJob.waitForCompletion(true);??
  • ???ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ???return?results;??
  • ?}??



  • 7. SolrIndexerJob 代碼分析


    下面是其run方法的源代碼

    [java]?view plain?copy
  • ??@Override??
  • ??public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{??
  • ????String?solrUrl?=?(String)args.get(Nutch.ARG_SOLR);??
  • ????String?batchId?=?(String)args.get(Nutch.ARG_BATCH);??
  • ????NutchIndexWriterFactory.addClassToConf(getConf(),?SolrWriter.class);??
  • ????getConf().set(SolrConstants.SERVER_URL,?solrUrl);??
  • ??
  • ??
  • //?初始化?job??
  • ????currentJob?=?createIndexJob(getConf(),?"solr-index",?batchId);??
  • ????Path?tmp?=?new?Path("tmp_"?+?System.currentTimeMillis()?+?"-"??
  • ????????????????+?new?Random().nextInt());??
  • //?設置輸出索引到文件,輸出格式使用IndexeroutputFormat,?其默認調用Solr的API把數據傳給Solr建立索引??
  • ????FileOutputFormat.setOutputPath(currentJob,?tmp);??
  • ????currentJob.waitForCompletion(true);??
  • ????ToolUtil.recordJobStatus(null,?currentJob,?results);??
  • ????return?results;??
  • ??}??



  • 有興趣可以看一下SolrWriter,它實現了NutchIndexerWriter這個接口,來把數據寫到不同的后臺搜索引擎中,這里默認使用了Solr,當然你也可以通過實現它來擴展你自己的搜索引擎,當然nutch還提供了插件來自定義索引的字段值,也就是IndexingFilter.java這個接口。


    8. 總結

    ? ? Nutch 2.0個人感覺現在還是不成熟的,有很多功能還沒有完成,主要的改變還是在它的數據存儲層,把原來的數據存儲進行了抽象,使其可以更好的運行在大規模數據抓取中,而且可以讓用戶來擴展具體的數據存儲。當然數據存儲層的變化帶來了一些流程上的變化,有一些操作可以支持使用數據庫操作來完成,這也大大減少了一些原來要MR任務來完成的代碼??傊畁utch 2.0 ?還是讓我們看到了nutch的一個發展方向。希望它發現的越來越好吧。


    轉載地址:http://blog.csdn.net/amuseme_lu/article/details/7777426

    總結

    以上是生活随笔為你收集整理的(转载)Nutch 2.0 之 抓取流程简单分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。