Nutch集成slor的索引方法介绍
/**
? ?* 建立索引
? ?* @param solrUrl solr的web地址
? ?* @param crawlDb 爬取DB的存放路径:crawlcrawldb
? ?* @param linkDb 爬取link的存放路径:crawllinkdb
? ?* @param segments 元数据存放路径:crawlsegments
? ?* @param noCommit ?是否提交slor服务器跟下slor索引
? ?* @param deleteGone 是否删除过时的文档
? ?* @param solrParams solr的参数
? ?* @param filter 是否启用URL过滤
? ?* @param normalize 是否格式化 URL
? ?* @throws IOException
? ?*/
? public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
? ? ? List<Path> segments, boolean noCommit, boolean deleteGone, String solrParams,
? ? ? boolean filter, boolean normalize) throws IOException {
? ? ? ? ...
? ? ?? IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
? ? ? ? ...
? ?}
Nutch的索引,是通过一个MR完成的。
map的输入是Nutch爬去目录下的sequenceFile ,key是Nutch爬取下来的URL,value使用的java泛型,将所有nutch自定义的数据类型抽象成一个NutchWritable对象。
Nutchwritable的包含的的数据类型如下源代码:
CLASSES = new Class[] {
? ? ? org.apache.hadoop.io.NullWritable.class,
? ? ? org.apache.hadoop.io.BooleanWritable.class,
? ? ? org.apache.hadoop.io.LongWritable.class,
? ? ? org.apache.hadoop.io.BytesWritable.class,
? ? ? org.apache.hadoop.io.FloatWritable.class,
? ? ? org.apache.hadoop.io.IntWritable.class,
? ? ? org.apache.hadoop.io.MapWritable.class,
? ? ? org.apache.hadoop.io.Text.class,
? ? ? org.apache.hadoop.io.MD5Hash.class,
? ? ? org.apache.nutch.crawl.CrawlDatum.class,
? ? ? org.apache.nutch.crawl.Inlink.class,
? ? ? org.apache.nutch.crawl.Inlinks.class,
? ? ? org.apache.nutch.fetcher.FetcherOutput.class,
? ? ? org.apache.nutch.metadata.Metadata.class,
? ? ? org.apache.nutch.parse.Outlink.class,
? ? ? org.apache.nutch.parse.ParseText.class,
? ? ? org.apache.nutch.parse.ParseData.class,
? ? ? org.apache.nutch.parse.ParseImpl.class,
? ? ? org.apache.nutch.parse.ParseStatus.class,
? ? ? org.apache.nutch.protocol.Content.class,
? ? ? org.apache.nutch.protocol.ProtocolStatus.class,
? ? ? org.apache.nutch.scoring.webgraph.LinkDatum.class,
? ? };
这些数据类型分别抽象了Nutch在爬取时各个阶段的数据类型。
map阶段不对value进行处理,只对URL进行处理,处理代码如下:
? ?String urlString = filterUrl(normalizeUrl(key.toString()));
调用是对URL根据定义好的过滤规则对URL进行过滤和格式化,当然是否进行这步操作可以通过调用命令时参数设置。
reduce是对所有爬取的数据进行处理,代码注释如下:
?/**
? ?* 输出格式:url作为key,索引的action作为value
? ?*/
? public void reduce(Text key, Iterator<NutchWritable> values,
? ? ? ? ? ? ? ? ? ? ?OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
? ? throws IOException {
? ? Inlinks inlinks = null;
? ? CrawlDatum dbDatum = null;
? ? CrawlDatum fetchDatum = null;
? ? ParseData parseData = null;
? ? ParseText parseText = null;
?
? ? while (values.hasNext()) {
? ? ? final Writable value = values.next().get(); // unwrap
? ? ? //如果是URL注入的数据类型
? ? ? if (value instanceof Inlinks) {
? ? ? ? inlinks = (Inlinks)value;
? ? ? ? //如果是爬取的数据类型
? ? ? } else if (value instanceof CrawlDatum) {
? ? ? ? final CrawlDatum datum = (CrawlDatum)value;
? ? ? ? //如果当前数据处于db注入状态
? ? ? ? if (CrawlDatum.hasDbStatus(datum)) {
? ? ? ? ? dbDatum = datum;
? ? ? ? }
? ? ? ? //如果当前数据处于爬取完成状态。
? ? ? ? else if (CrawlDatum.hasFetchStatus(datum)) {
?
? ? ? ? ? // don't index unmodified (empty) pages
? ? ? ? ? //判断爬去的是否进行了修改
? ? ? ? ? if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
? ? ? ? ? ? fetchDatum = datum;
?
? ? ? ? ? ? /**
? ? ? ? ? ? ?* Check if we need to delete 404 NOT FOUND and 301 PERMANENT REDIRECT.
? ? ? ? ? ? ?*/
? ? ? ? ? ? //参数中如果设置删除为true则删除错误及过时的页面
? ? ? ? ? ? if (delete) {
? ? ? ? ? ? //如果爬取的页面过期,采取删除操作。
? ? ? ? ? ? ? if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
? ? ? ? ? ? ? ? reporter.incrCounter("IndexerStatus", "Documents deleted", 1);
?
? ? ? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
? ? ? ? ? ? ? ? output.collect(key, action);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? //如果爬去的页面已经重定向到另外一个页面,才去删除操作。
? ? ? ? ? ? ? if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM) {
? ? ? ? ? ? ? ? reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);
?
? ? ? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
? ? ? ? ? ? ? ? output.collect(key, action);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? ?//URL是通过其他URL被发现的 ||页面的签名||页面的元数据是通过解析器产生的
? ? ? ? } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
? ? ? ? ? ? ? ? ? ?CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
? ? ? ? ? ? ? ? ? ?CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
? ? ? ? ? continue;
? ? ? ? } else {
? ? ? ? ? throw new RuntimeException("Unexpected status: "+datum.getStatus());
? ? ? ? }
? ? ? ?//如果是解析的数据类型
? ? ? } else if (value instanceof ParseData) {
? ? ? ? parseData = (ParseData)value;
?
? ? ? ? // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
? ? ? ? if (deleteRobotsNoIndex) {
? ? ? ? ? // Get the robots meta data
? ? ? ? ? String robotsMeta = parseData.getMeta("robots");
?
? ? ? ? ? // Has it a noindex for this url?
? ? ? ? ? if (robotsMeta != null && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
? ? ? ? ? ? // Delete it!
? ? ? ? ? ? NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
? ? ? ? ? ? output.collect(key, action);
? ? ? ? ? ? return;
? ? ? ? ? }
? ? ? ? }
? ? ? ? //解析完的Text文件
? ? ? } else if (value instanceof ParseText) {
? ? ? ? parseText = (ParseText)value;
? ? ? } else if (LOG.isWarnEnabled()) {
? ? ? ? LOG.warn("Unrecognized type: "+value.getClass());
? ? ? }
? ? }
? ? //如果只有链接,没有爬取历史的记录或者爬取数据直接返回
? ? if (fetchDatum == null || dbDatum == null
? ? ? ? || parseText == null || parseData == null) {
? ? ? return; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // only have inlinks
? ? }
?
? ? // Whether to skip DB_NOTMODIFIED pages
? ? //如果页面被爬取过,但是没有进行修过,在传进来的命令中设置了跳过则跳过。
? ? if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
? ? ? reporter.incrCounter("IndexerStatus", "Skipped", 1);
? ? ? return;
? ? }
? ? //页面爬去成功,但是解析失败的,直接返回
? ? if (!parseData.getStatus().isSuccess() ||
? ? ? ? fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
? ? ? return;
? ? }
?
? ? NutchDocument doc = new NutchDocument();
? ? //解析完的数据中获取页面的元数据
? ? final Metadata metadata = parseData.getContentMeta();
?
? ? // add segment, used to map from merged index back to segment files
? ? doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
? ? //页面摘要
? ? // add digest, used by dedup
? ? doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
? ??
? ? final Parse parse = new ParseImpl(parseText, parseData);
? ? try {
? ? ? // extract information from dbDatum and pass it to
? ? ? // fetchDatum so that indexing filters can use it
? ? ? final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
? ? ? if (url != null) {
? ? ? ? fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
? ? ? }
? ? ? // run indexing filters
? ? ? //执行所有过滤器
? ? ? doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
? ? } catch (final IndexingException e) {
? ? ? if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
? ? ? reporter.incrCounter("IndexerStatus", "Errors", 1);
? ? ? return;
? ? }
?
? ? // skip documents discarded by indexing filters
? ? if (doc == null) {
? ? ? reporter.incrCounter("IndexerStatus", "Skipped by filters", 1);
? ? ? return;
? ? }
?
? ? float boost = 1.0f;
? ? // run scoring filters
? ? //执行评分过滤器
? ? try {
? ? ? boost = this.scfilters.indexerScore(key, doc, dbDatum,
? ? ? ? ? ? ? fetchDatum, parse, inlinks, boost);
? ? } catch (final ScoringFilterException e) {
? ? ? if (LOG.isWarnEnabled()) {
? ? ? ? LOG.warn("Error calculating score " + key + ": " + e);
? ? ? }
? ? ? return;
? ? }
? ? //将评分作为文档的权重
? ? // apply boost to all indexed fields.
? ? doc.setWeight(boost);
? ? // store boost for use by explain and dedup
? ? doc.add("boost", Float.toString(boost));
?
? ? reporter.incrCounter("IndexerStatus", "Documents added", 1);
?
? ? NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
? ? output.collect(key, action);
? }
目前研究到此,后面的待续。。