/ 中存储网

深入Nutch index源代码解析(一)

2014-09-05 00:13:25 来源:中存储

    Nutch集成slor的索引方法介绍

    /**

    ? ?* 建立索引

    ? ?* @param solrUrl solr的web地址

    ? ?* @param crawlDb 爬取DB的存放路径:crawlcrawldb

    ? ?* @param linkDb 爬取link的存放路径:crawllinkdb

    ? ?* @param segments 元数据存放路径:crawlsegments

    ? ?* @param noCommit ?是否提交slor服务器跟下slor索引

    ? ?* @param deleteGone 是否删除过时的文档

    ? ?* @param solrParams solr的参数

    ? ?* @param filter 是否启用URL过滤

    ? ?* @param normalize 是否格式化 URL

    ? ?* @throws IOException

    ? ?*/

    ? public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,

    ? ? ? List<Path> segments, boolean noCommit, boolean deleteGone, String solrParams,

    ? ? ? boolean filter, boolean normalize) throws IOException {

    ? ? ? ? ...

    ? ? ?? IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);

    ? ? ? ? ...

    ? ?}

    Nutch的索引,是通过一个MR完成的。

    map的输入是Nutch爬去目录下的sequenceFile ,key是Nutch爬取下来的URL,value使用的java泛型,将所有nutch自定义的数据类型抽象成一个NutchWritable对象。

    Nutchwritable的包含的的数据类型如下源代码:

    CLASSES = new Class[] {

    ? ? ? org.apache.hadoop.io.NullWritable.class,

    ? ? ? org.apache.hadoop.io.BooleanWritable.class,

    ? ? ? org.apache.hadoop.io.LongWritable.class,

    ? ? ? org.apache.hadoop.io.BytesWritable.class,

    ? ? ? org.apache.hadoop.io.FloatWritable.class,

    ? ? ? org.apache.hadoop.io.IntWritable.class,

    ? ? ? org.apache.hadoop.io.MapWritable.class,

    ? ? ? org.apache.hadoop.io.Text.class,

    ? ? ? org.apache.hadoop.io.MD5Hash.class,

    ? ? ? org.apache.nutch.crawl.CrawlDatum.class,

    ? ? ? org.apache.nutch.crawl.Inlink.class,

    ? ? ? org.apache.nutch.crawl.Inlinks.class,

    ? ? ? org.apache.nutch.fetcher.FetcherOutput.class,

    ? ? ? org.apache.nutch.metadata.Metadata.class,

    ? ? ? org.apache.nutch.parse.Outlink.class,

    ? ? ? org.apache.nutch.parse.ParseText.class,

    ? ? ? org.apache.nutch.parse.ParseData.class,

    ? ? ? org.apache.nutch.parse.ParseImpl.class,

    ? ? ? org.apache.nutch.parse.ParseStatus.class,

    ? ? ? org.apache.nutch.protocol.Content.class,

    ? ? ? org.apache.nutch.protocol.ProtocolStatus.class,

    ? ? ? org.apache.nutch.scoring.webgraph.LinkDatum.class,

    ? ? };

    这些数据类型分别抽象了Nutch在爬取时各个阶段的数据类型。

    map阶段不对value进行处理,只对URL进行处理,处理代码如下:

    ? ?String urlString = filterUrl(normalizeUrl(key.toString()));

    调用是对URL根据定义好的过滤规则对URL进行过滤和格式化,当然是否进行这步操作可以通过调用命令时参数设置。

    reduce是对所有爬取的数据进行处理,代码注释如下:

    ?/**

    ? ?* 输出格式:url作为key,索引的action作为value

    ? ?*/

    ? public void reduce(Text key, Iterator<NutchWritable> values,

    ? ? ? ? ? ? ? ? ? ? ?OutputCollector<Text, NutchIndexAction> output, Reporter reporter)

    ? ? throws IOException {

    ? ? Inlinks inlinks = null;

    ? ? CrawlDatum dbDatum = null;

    ? ? CrawlDatum fetchDatum = null;

    ? ? ParseData parseData = null;

    ? ? ParseText parseText = null;

    ?

    ? ? while (values.hasNext()) {

    ? ? ? final Writable value = values.next().get(); // unwrap

    ? ? ? //如果是URL注入的数据类型

    ? ? ? if (value instanceof Inlinks) {

    ? ? ? ? inlinks = (Inlinks)value;

    ? ? ? ? //如果是爬取的数据类型

    ? ? ? } else if (value instanceof CrawlDatum) {

    ? ? ? ? final CrawlDatum datum = (CrawlDatum)value;

    ? ? ? ? //如果当前数据处于db注入状态

    ? ? ? ? if (CrawlDatum.hasDbStatus(datum)) {

    ? ? ? ? ? dbDatum = datum;

    ? ? ? ? }

    ? ? ? ? //如果当前数据处于爬取完成状态。

    ? ? ? ? else if (CrawlDatum.hasFetchStatus(datum)) {

    ?

    ? ? ? ? ? // don't index unmodified (empty) pages

    ? ? ? ? ? //判断爬去的是否进行了修改

    ? ? ? ? ? if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {

    ? ? ? ? ? ? fetchDatum = datum;

    ?

    ? ? ? ? ? ? /**

    ? ? ? ? ? ? ?* Check if we need to delete 404 NOT FOUND and 301 PERMANENT REDIRECT.

    ? ? ? ? ? ? ?*/

    ? ? ? ? ? ? //参数中如果设置删除为true则删除错误及过时的页面

    ? ? ? ? ? ? if (delete) {

    ? ? ? ? ? ? //如果爬取的页面过期,采取删除操作。

    ? ? ? ? ? ? ? if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {

    ? ? ? ? ? ? ? ? reporter.incrCounter("IndexerStatus", "Documents deleted", 1);

    ?

    ? ? ? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);

    ? ? ? ? ? ? ? ? output.collect(key, action);

    ? ? ? ? ? ? ? ? return;

    ? ? ? ? ? ? ? }

    ? ? ? ? ? ? ? //如果爬去的页面已经重定向到另外一个页面,才去删除操作。

    ? ? ? ? ? ? ? if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM) {

    ? ? ? ? ? ? ? ? reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);

    ?

    ? ? ? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);

    ? ? ? ? ? ? ? ? output.collect(key, action);

    ? ? ? ? ? ? ? ? return;

    ? ? ? ? ? ? ? }

    ? ? ? ? ? ? }

    ? ? ? ? ? }

    ? ? ? ? ?//URL是通过其他URL被发现的 ||页面的签名||页面的元数据是通过解析器产生的

    ? ? ? ? } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||

    ? ? ? ? ? ? ? ? ? ?CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||

    ? ? ? ? ? ? ? ? ? ?CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {

    ? ? ? ? ? continue;

    ? ? ? ? } else {

    ? ? ? ? ? throw new RuntimeException("Unexpected status: "+datum.getStatus());

    ? ? ? ? }

    ? ? ? ?//如果是解析的数据类型

    ? ? ? } else if (value instanceof ParseData) {

    ? ? ? ? parseData = (ParseData)value;

    ?

    ? ? ? ? // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434

    ? ? ? ? if (deleteRobotsNoIndex) {

    ? ? ? ? ? // Get the robots meta data

    ? ? ? ? ? String robotsMeta = parseData.getMeta("robots");

    ?

    ? ? ? ? ? // Has it a noindex for this url?

    ? ? ? ? ? if (robotsMeta != null && robotsMeta.toLowerCase().indexOf("noindex") != -1) {

    ? ? ? ? ? ? // Delete it!

    ? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);

    ? ? ? ? ? ? output.collect(key, action);

    ? ? ? ? ? ? return;

    ? ? ? ? ? }

    ? ? ? ? }

    ? ? ? ? //解析完的Text文件

    ? ? ? } else if (value instanceof ParseText) {

    ? ? ? ? parseText = (ParseText)value;

    ? ? ? } else if (LOG.isWarnEnabled()) {

    ? ? ? ? LOG.warn("Unrecognized type: "+value.getClass());

    ? ? ? }

    ? ? }

    ? ? //如果只有链接,没有爬取历史的记录或者爬取数据直接返回

    ? ? if (fetchDatum == null || dbDatum == null

    ? ? ? ? || parseText == null || parseData == null) {

    ? ? ? return; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // only have inlinks

    ? ? }

    ?

    ? ? // Whether to skip DB_NOTMODIFIED pages

    ? ? //如果页面被爬取过,但是没有进行修过,在传进来的命令中设置了跳过则跳过。

    ? ? if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {

    ? ? ? reporter.incrCounter("IndexerStatus", "Skipped", 1);

    ? ? ? return;

    ? ? }

    ? ? //页面爬去成功,但是解析失败的,直接返回

    ? ? if (!parseData.getStatus().isSuccess() ||

    ? ? ? ? fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {

    ? ? ? return;

    ? ? }

    ?

    ? ? NutchDocument doc = new NutchDocument();

    ? ? //解析完的数据中获取页面的元数据

    ? ? final Metadata metadata = parseData.getContentMeta();

    ?

    ? ? // add segment, used to map from merged index back to segment files

    ? ? doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));

    ? ? //页面摘要

    ? ? // add digest, used by dedup

    ? ? doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));

    ? ??

    ? ? final Parse parse = new ParseImpl(parseText, parseData);

    ? ? try {

    ? ? ? // extract information from dbDatum and pass it to

    ? ? ? // fetchDatum so that indexing filters can use it

    ? ? ? final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);

    ? ? ? if (url != null) {

    ? ? ? ? fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);

    ? ? ? }

    ? ? ? // run indexing filters

    ? ? ? //执行所有过滤器

    ? ? ? doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);

    ? ? } catch (final IndexingException e) {

    ? ? ? if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }

    ? ? ? reporter.incrCounter("IndexerStatus", "Errors", 1);

    ? ? ? return;

    ? ? }

    ?

    ? ? // skip documents discarded by indexing filters

    ? ? if (doc == null) {

    ? ? ? reporter.incrCounter("IndexerStatus", "Skipped by filters", 1);

    ? ? ? return;

    ? ? }

    ?

    ? ? float boost = 1.0f;

    ? ? // run scoring filters

    ? ? //执行评分过滤器

    ? ? try {

    ? ? ? boost = this.scfilters.indexerScore(key, doc, dbDatum,

    ? ? ? ? ? ? ? fetchDatum, parse, inlinks, boost);

    ? ? } catch (final ScoringFilterException e) {

    ? ? ? if (LOG.isWarnEnabled()) {

    ? ? ? ? LOG.warn("Error calculating score " + key + ": " + e);

    ? ? ? }

    ? ? ? return;

    ? ? }

    ? ? //将评分作为文档的权重

    ? ? // apply boost to all indexed fields.

    ? ? doc.setWeight(boost);

    ? ? // store boost for use by explain and dedup

    ? ? doc.add("boost", Float.toString(boost));

    ?

    ? ? reporter.incrCounter("IndexerStatus", "Documents added", 1);

    ?

    ? ? NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);

    ? ? output.collect(key, action);

    ? }

    目前研究到此,后面的待续。。