【Nutch2.2.1源代码分析之5】索引的基本流程
生活随笔
收集整理的這篇文章主要介紹了
【Nutch2.2.1源代码分析之5】索引的基本流程
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
二、程序調用流程
查看Nutch中的執行腳本--nutch,得到以下信息: elif [ "$COMMAND" = "solrindex" ] ; then
CLASS=org.apache.nutch.indexer.solr.SolrIndexerJob
因此程序入口位于SolrIndexerJob類中。
(一)org.apache.nutch.indexer.SolrIndexerJob 1、程序入口 public static void main(String[] args) throws Exception {final int res = ToolRunner.run(NutchConfiguration.create(),new SolrIndexerJob(), args);System.exit(res);}使用了ToolRunner.run()來執行程序,可參考:使用ToolRunner運行Hadoop程序基本原理分析。 其中第一個參數主要是加載了nutch相關的參數,主要包括hadoop的core-default.xml、core-site.xml以及nutch的nutch-default.xml、nutch-site.xml。
第二個參數指明了運行SolrIndexerJob的run(String[])方法.
2、執行SolrIndexerJob類的run(String[])方法 public int run(String[] args) throws Exception {if (args.length < 2) {System.err.println("Usage: SolrIndexerJob <solr url> (<batchId> | -all | -reindex) [-crawlId <id>]");return -1;}if (args.length == 4 && "-crawlId".equals(args[2])) {getConf().set(Nutch.CRAWL_ID_KEY, args[3]);}try {indexSolr(args[0], args[1]);return 0;} catch (final Exception e) {LOG.error("SolrIndexerJob: " + StringUtils.stringifyException(e));return -1;}}先判斷參數的合理性,然后執行執行indexSolr(String,String)方法。
3、執行indexSolr(String,String)方法 public void indexSolr(String solrUrl, String batchId) throws Exception {LOG.info("SolrIndexerJob: starting");run(ToolUtil.toArgMap(Nutch.ARG_SOLR, solrUrl,Nutch.ARG_BATCH, batchId));// do the commits once and for all the reducers in one gogetConf().set(SolrConstants.SERVER_URL,solrUrl);SolrServer solr = SolrUtils.getCommonsHttpSolrServer(getConf());if (getConf().getBoolean(SolrConstants.COMMIT_INDEX, true)) {solr.commit();}LOG.info("SolrIndexerJob: done.");}
4、執行run(Map<...>)方法? @Overridepublic Map<String,Object> run(Map<String,Object> args) throws Exception {String solrUrl = (String)args.get(Nutch.ARG_SOLR);String batchId = (String)args.get(Nutch.ARG_BATCH);NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);getConf().set(SolrConstants.SERVER_URL, solrUrl);currentJob = createIndexJob(getConf(), "solr-index", batchId);currentJob.waitForCompletion(true);ToolUtil.recordJobStatus(null, currentJob, results);return results;}
(二)org.apache.nutch.indexer.IndexerJob 1、執行createIndexJob()方法。 protected Job createIndexJob(Configuration conf, String jobName, String batchId)throws IOException, ClassNotFoundException {conf.set(GeneratorJob.BATCH_ID, batchId);Job job = new NutchJob(conf, jobName);// TODO: Figure out why this needs to be herejob.getConfiguration().setClass("mapred.output.key.comparator.class",StringComparator.class, RawComparator.class);Collection<WebPage.Field> fields = getFields(job);StorageUtils.initMapperJob(job, fields, String.class, NutchDocument.class,IndexerMapper.class);job.setNumReduceTasks(0);job.setOutputFormatClass(IndexerOutputFormat.class);return job;} }
2、執行map相關的方法,包括setup(),map(),cleanup() public static class IndexerMapperextends GoraMapper<String, WebPage, String, NutchDocument> {public IndexUtil indexUtil;public DataStore<String, WebPage> store;protected Utf8 batchId;@Overridepublic void setup(Context context) throws IOException {Configuration conf = context.getConfiguration();batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));indexUtil = new IndexUtil(conf);try {store = StorageUtils.createWebStore(conf, String.class, WebPage.class);} catch (ClassNotFoundException e) {throw new IOException(e);}}protected void cleanup(Context context) throws IOException ,InterruptedException {store.close();};@Overridepublic void map(String key, WebPage page, Context context)throws IOException, InterruptedException {ParseStatus pstatus = page.getParseStatus();if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)|| pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {return; // filter urls not parsed}Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);if (!batchId.equals(REINDEX)) {if (!NutchJob.shouldProcess(mark, batchId)) {if (LOG.isDebugEnabled()) {LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")");}return;}}NutchDocument doc = indexUtil.index(key, page);if (doc == null) {return;}if (mark != null) {Mark.INDEX_MARK.putMark(page, Mark.UPDATEDB_MARK.checkMark(page));store.put(key, page);}context.write(key, doc);}}
3、調用context.write() 由于 ?job.setOutputFormatClass(IndexerOutputFormat.class); ?所以寫入index??
(三)public class IndexUtil? 1、調用index()方法 public NutchDocument index(String key, WebPage page) {NutchDocument doc = new NutchDocument();doc.add("id", key);doc.add("digest", StringUtil.toHexString(page.getSignature()));if (page.getBatchId() != null) {doc.add("batchId", page.getBatchId().toString());}String url = TableUtil.unreverseUrl(key);if (LOG.isDebugEnabled()) {LOG.debug("Indexing URL: " + url);}try {doc = filters.filter(doc, url, page);} catch (IndexingException e) {LOG.warn("Error indexing "+key+": "+e);return null;}// skip documents discarded by indexing filtersif (doc == null) return null;float boost = 1.0f;// run scoring filterstry {boost = scoringFilters.indexerScore(url, doc, page, boost);} catch (final ScoringFilterException e) {LOG.warn("Error calculating score " + key + ": " + e);return null;}doc.setScore(boost);// store boost for use by explain and dedupdoc.add("boost", Float.toString(boost));return doc;}
三、plugin中的字段索引 1、關于basic字段的索引在public class BasicIndexingFilter implements IndexingFilter 中
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的【Nutch2.2.1源代码分析之5】索引的基本流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Nutch2.2.1源代码分析之4】N
- 下一篇: 安装hadoop1.2.1集群环境