Friday, January 13, 2012

Nutch/GORA - Scoring and Indexing Plugins for Metadata Inheritance

Last week, I described my initial explorations with Nutch/GORA and Cassandra to crawl a small external site. At the end of the post, I raised three issues, which, as Julien Nioche points out, are already supported via Nutch. This week, I describe a pair of Nutch Plugins to do metadata inheritance - this is based heavily on the ideas in the Nutch Writing Plugin Example Wiki Page - the example in this page does almost what I want, but works with the traditional segment files, not the Nutch/GORA based WebPage data structure. So I figured that the code may be worth sharing, for people who would like to leverage Nutch/GORA.

What is Metadata Inheritance?

What I mean by this is that I want to inject some metadata with the seed URL, and the metadata injected should be propagated to all the children of that page, recursively. I want to use the same infrastructure to crawl the web, as well as crawl internal resources, and I want some way to distinguish the various sources, so the one piece of metadata I add to the seed URL is the u_idx value.

Mocking the Target Site

Before I started, I decided to mock out my own "site" using a bunch of autogenerated HTML files served up using CherryPy. Since I was just starting out with Nutch/GORA, I expected it to be a few iterations before I got things to work correctly, and it seemed a bit unfair to slam an external site every 5-10 minutes. Plus, doing this allows me to work on the code without having to have an internet connection, which is almost all the time for me (most of the stuff I write about is done in a train with no internet connection, during my commute to and from work). In any case, my mock site looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  A.html
  |
  +-- B.html
  |   |
  |   +-- D.html
  |   |
  |   +-- E.html
  |
  +-- C.html
  |   |
  |   +-- F.html
  |   |
  |   +-- G.html
  |

The code to build this site is quite simple, just a few lines of Python code. Here it is if you are interested.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/python
import sys
import os

def gen_page(dir, prefix, pages):
  curr_page = pages[0]
  sub_pages = pages[1:]
  f = open(os.path.join(dir, curr_page + ".html"), "w")
  c = """<html>
    <head><title>Test Site Page %s</title></head>
    <body>
      <p>Random text for Page %s</p>
      <ul>
        <li><a href=\"%s%s\">Page %s</a></li>
        <li><a href=\"%s%s\">Page %s</a></li>
      </ul>
    </body>
    </html>
  """ 
  f.write(c % (curr_page, curr_page, prefix, sub_pages[0], \
    sub_pages[0], prefix, sub_pages[1], sub_pages[1]))
  f.close()

def main():
  prefix = "http://localhost:8080/test/"
  dir = "../test_site"
  # depth 0
  gen_page(dir, prefix, ["A", "B", "C"])
  # depth 1
  gen_page(dir, prefix, ["B", "D", "E"])
  gen_page(dir, prefix, ["C", "F", "G"])
  # depth 2
  gen_page(dir, prefix, ["D", "H", "I"])
  gen_page(dir, prefix, ["E", "J", "K"])
  gen_page(dir, prefix, ["F", "L", "M"])
  gen_page(dir, prefix, ["G", "N", "O"])

if __name__ == "__main__":
  main()

So is the code to serve this site locally on port 8080, using CherryPy. I could have used a standard HTTP server like ngnix or Apache HTTPD instead, but I had CherryPy already available from a previous project, and I expected that I'll need a programmable HTTP interface at some point in this project anyway, so sooner the better.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/python
import os.path
import cherrypy
from cherrypy.lib.static import serve_file

class Root:

  @cherrypy.expose
  def test(self, name):
    return serve_file(os.path.join(""/path/to/site_root", "%s.html" % \
      (name)), content_type="text/html")

if __name__ == '__main__':
  current_dir = os.path.dirname(os.path.abspath(__file__))
  # Set up site-wide config first so we get a log if errors occur.
  cherrypy.config.update({'environment': 'production',
    'log.error_file': 'site.log',
    'log.screen': True,
    "server.socket_host" : "127.0.0.1",
    "server.socket_port" : 8080})
  cherrypy.quickstart(Root(), '/')

My seed file points to the A.html file, like so:

1
http://localhost:8080/test/A u_idx=test

Project Setup

I also spent some time getting Nutch to work with Ivy inside Eclipse and on the command line. I found the instructions on this Pentaho Wiki Page very helpful. The way I got the sftp-protocol's ivy.xml configuration to work on both Eclipse and command line (pointed out by Alexis in his TechVineyard post) was to just put the full path in it - not terribly portable, but works for me.

I also moved out all the configuration overrides into conf/nutch-site.xml like I should have done to begin with, and added the extra libraries I spoke of in my previous post into nutch/lib, so calls to "ant" just repopulates the runtime/local with the updated code, libraries and configuration. Here is my nutch-site.xml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Source: conf/nutch-site.xml -->

<configuration>

<property>
  <name>http.agent.name</name>
  <value>CNC_Crawler</value>
  <description/>
</property>

<property>
  <name>http.robots.agents</name>
  <value>CNC_Crawler,*</value>
  <description/>
</property>

<property>
  <name>storage.data.store.class</name>
  <value>org.apache.gora.cassandra.store.CassandraStore</value>
  <description>Default class for storing data</description>
</property>

<property>
  <name>db.ignore.external.links</name>
  <value>true</value>
  <description>If true, outlinks leading from a page to external hosts
  will be ignored. This is an effective way to limit the crawl to include
  only initially injected hosts, without creating complex URLFilters.
  </description>
</property>

<property>
  <name>mycompany.usertags.inheritable</name>
  <value>u_idx</value>
  <description>Comma-separated list of user tags which should be inherited
  by child pages.
  </description>
</property>

<property>
  <name>plugin.includes</name>
 <value>protocol-http|urlfilter-regex|parse-(html|tika)|\
        index-(basic|anchor)|urlnormalizer-(pass|regex|\
        basic)|scoring-opic|mycompany</value>
 <description>Regular expression naming plugin directory names to
  include.  Any plugin not matching this expression is excluded.
  In any case you need at least include the nutch-extensionpoints plugin. By
  default Nutch includes crawling just HTML and plain text via HTTP,
  and basic indexing and search plugins. In order to use HTTPS please enable
  protocol-httpclient, but be aware of possible intermittent problems with the
  underlying commons-httpclient library.
  </description>
</property>

</configuration>

I also built myself a little script that does the generate/fetch/parse/inject dance for a specified depth. Its still a bit of a work in progress - inject doesn't work at the moment, but I am working on it :-). At the end of each iteration, it prints out the counts in the "f" and "p" column families, as well as the count of records indexed to Solr. Here it is:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/python
# Takes a depth parameter and repeatedly calls inject, fetch, generate, 
# parse, update that many times

import getopt
import os
import os.path
import pycassa
import simplejson
import subprocess
import sys
import urllib

from pycassa.pool import ConnectionPool
from pycassa.util import OrderedDict
from urllib2 import *

# configuration
NUTCH_HOME = "/path/to/your/nutch/runtime/local"
SOLR_URL = "http://127.0.0.1:8983/solr/"
CASSANDRA_HOST_PORT = "localhost:9160"

def inject(depth, seed):
  print "`:::: Injecting (depth %d) from seed: %s ::::" % (depth, seed)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch inject", 'r')
  for line in p.readlines():
    print line,
  p.close()
  os.chdir(cwd)

def generate(depth):
  print ":::: Generating (depth %d) ::::" % (depth)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch generate", 'r')
  batch_id = None
  for line in p.readlines():
    print line,
    if line.find("generated batch id") > -1:
      cols = line.split(":")
      if len(cols) == 3:
        batch_id = cols[2].strip()
  p.close()
  os.chdir(cwd)
  return batch_id

def fetch(depth, batch_id):
  print ":::: Fetching (depth %d) with batch_id: %s ::::" % (depth, batch_id)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch fetch %s" % (batch_id), 'r')
  for line in p.readlines():
    print line,
  p.close()
  os.chdir(cwd)

def parse(depth, batch_id):
  print ":::: Parsing (depth %d) with batch_id: %s ::::" % (depth, batch_id)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch parse %s" % (batch_id), 'r')
  for line in p.readlines():
    print line,
  p.close()
  os.chdir(cwd)

def updatedb(depth):
  print ":::: Update DB (depth %d) ::::" % (depth)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch updatedb", 'r')
  for line in p.readlines():
    print line,
  p.close()
  os.chdir(cwd)

def index(depth, batch_id):
  print ":::: Indexing (depth %d) with batch_id: %s ::::" % (depth, batch_id)
  cwd = os.getcwd()
  os.chdir(NUTCH_HOME)
  p = os.popen("bin/nutch solrindex %s %s" % (SOLR_URL, batch_id), 'r')
  for line in p.readlines():
    print line,
  p.close()
  os.chdir(cwd)

def cassandra_stats(casspool, depth):
  count_f = get_rowcount(casspool, "f")
  count_p = get_rowcount(casspool, "p")
  print ":::: In Cassandra (depth %d), count(f) = %d, count(p) = %d ::::" % \
    (depth, count_f, count_p)

def get_rowcount(casspool, cfname):
  cf = pycassa.ColumnFamily(casspool, cfname)
  res = cf.get_range(start="", finish="")
  count = 0
  for k, v in res:
    count = count + 1
  return count

def solr_stats(depth):
  server = "".join([SOLR_URL, "select"])
  params = urllib.urlencode({
    "q" : "*:*", 
    "rows" : "0", 
    "wt" : "json"
  })
  conn = urllib.urlopen(server, params)
  rsp = simplejson.load(conn)
  numfound = rsp["response"]["numFound"]
  print ":::: In Solr (depth %d), %s rows loaded into Solr ::::" % \
    (depth, numfound)

def usage(message=None):
  if message != None:
    print "Error: %s" % (message)
  print "Usage: crawl.py -d depth [-s seed_file]"
  print "       crawl.py -h"
  sys.exit(-1)

def main():
  try:
    (opts, args) = getopt.getopt(sys.argv[1:], 
      "d:s:h", ["depth", "seed", "help"])
  except getopt.GetOptError:
    usage()
  if len(filter(lambda x : x[0] in ("-h", "--help"), opts)) == 1:
    usage()
  crawl_depth = None
  seed = None
  for opt in opts:
    (key, value) = opt
    if key in ("-d", "--depth"):
      crawl_depth = value
    elif key in ("-s", "--seed"):
      seed = value
    else:
      usage()
  # crawl_depth must be specified
  if crawl_depth == None:
    usage("Depth must be specified")
  depth = int(crawl_depth)
  # seed directory if specified must exist
  if seed != None:
    print "seed", seed
    # if seed specified run inject
    if not os.path.isdir(seed):
      usage("Seed directory %s is not a directory" % (seed))
    # TODO: find why this doesn't work?
    #inject(depth, seed)
  # set up pointers to cassandra etc
  casspool = ConnectionPool("webpage", [CASSANDRA_HOST_PORT])
  # for depth iterations run the other methods
  for iter in range(0, int(depth)):
    batch_id = generate(iter)
    fetch(iter, batch_id)
    parse(iter, batch_id)
    updatedb(iter)
    index(iter, batch_id)
    cassandra_stats(casspool, iter)
    solr_stats(iter)

if __name__ == "__main__":
  main()

And finally, I put the project under local git control. I've been using git since my last project, and its pretty addictive :-).

Coding the Plugins

While the Nutch Writing Plugins Wiki page was my main source of information for this work, I also trolled through the Nutch code to find other possible extension points, and here is the list I came up with, so you (and I in about 6 months) don't have to :-).

  • URLNormalizer = called from nutch inject and generate.
  • URLFilter - called from nutch generate.
  • Protocol - called from nutch fetch.
  • Parser - called from nutch parse.
  • ScoringFilter - called from nutch updatedb and generate.
  • IndexingFilter - called from nutch solrindex.

The setup instructions are very detailed, and I followed it almost to the letter to set up my plugin project. Each plugin project has three XML files at its root - the build.xml, ivy.xml and plugin.xml. Here is the Ant build.xml - it delegates all functions to the parent build.xml (supplied by Nutch):

1
2
3
4
5
<?xml version="1.0"?>
<1-- Source: src/plugin/mycompany/build.xml -->
<project name="mycompany" default="jar-core">
  <import file="../build-plugin.xml"/>
</project>

And the ivy.xml file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/plugin/mycompany/ivy.xml -->
<ivy-module version="1.0">
  
  <info organisation="com.mycompany.nutch" module="${ant.project.name}">
    <license name="Apache 2.0"/>
    <ivyauthor name="Sujit Pal" url="http://sujitpal.blogspot.com"/>
    <description>Nutch Custom Plugins for CNC</description>
  </info>

  <configurations>
    <include file="${nutch.root}/ivy/ivy-configurations.xml"/>
  </configurations>

  <publications>
    <artifact conf="master"/>
  </publications>

  <dependencies>
  </dependencies>

</ivy-module>

And finally, the plugin descriptor plugin.xml file, which describes to the Nutch runtime the capabilities of the plugins in this project, so it can discover and include them at appropriate points in its lifecycle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source src/plugin/mycompany/plugin.zml -->
<plugin id="mycompany" 
    name="Custom Company-specific plugins"
    version="1.0.0"
    provider-name="My Company">

  <runtime>
    <library name="mycompany.jar">
      <export name="*"/>
    </library>
  </runtime>

  <requires>
    <import plugin="nutch-extensionpoints"/>
  </requires>

  <extension id="com.mycompany.nutch.indexer.usertags"
      name="User Tag Indexing Filter"
      point="org.apache.nutch.indexer.IndexingFilter">
    <implementation id="mycompany-indexer-usertags"
        class="com.mycompany.nutch.indexer.usertags.UserTagIndexingFilter"/>
  </extension>
  <extension id="com.mycompany.nutch.scoring.usertags"
      name="User Tag Scoring Filter"
      point="org.apache.nutch.scoring.ScoringFilter">
    <implementation id="mycompany-scoring-metadata"
        class="com.mycompany.nutch.scoring.usertags.UserTagScoringFilter"/>
  </extension>

</plugin>

The plugins themselves, as expected, are mostly plain old Java, and have nothing to do with Hadoop or Cassandra - these are abstracted away in the innards of Nutch and GORA. However, the API's specified by the various plugin interfaces are slightly different in the Nutch/GORA case, so I describe them below:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/scoring/usertags/UserTagScoringFilter.java
package com.mycompany.nutch.scoring.usertags;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.scoring.ScoreDatum;
import org.apache.nutch.scoring.ScoringFilter;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;

public class UserTagScoringFilter implements ScoringFilter {

  private static final Log LOG = LogFactory.getLog(UserTagScoringFilter.class);
  private static final String INHERITABLE_USERTAGS_LIST_PROP = 
    "mycompany.usertags.inheritable";
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
    FIELDS.add(WebPage.Field.OUTLINKS);
  }
  
  private Configuration conf;
  private Set<String> inheritableUserTags;

  @Override
  public void distributeScoreToOutlinks(String fromUrl, 
      WebPage page, Collection<ScoreDatum> scoreData, 
      int allCount) throws ScoringFilterException {
    // get the usertags from the parent
    LOG.info("Looking for user tags for: " + fromUrl);
    Map<String,String> userTags = new HashMap<String,String>();
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    for (Utf8 key : metadata.keySet()) {
      String keyStr = TableUtil.toString(key);
      if (StringUtils.isEmpty(keyStr)) {
        continue;
      }
      if (inheritableUserTags.contains(keyStr)) {
        userTags.put(keyStr, Bytes.toString(
          Bytes.toBytes(metadata.get(key))));
      }
    }
    LOG.info("User tags for " + fromUrl + " = " + userTags);
    // distribute the userTags to all outlink records
    LOG.info("Propagating user tags to outlinks...");
    for (ScoreDatum scoreDatum : scoreData) {
      LOG.info("Processing outlink:" + scoreDatum.getUrl());
      for (String tagName : userTags.keySet()) {
        String tagValue = userTags.get(tagName);
        if (StringUtils.isNotEmpty(tagValue)) {
          LOG.info("Setting outlink.meta[" + tagName + 
              "]=" + tagValue + " for outlink:" + 
              scoreDatum.getUrl());
          scoreDatum.setMeta(tagName, Bytes.toBytes(tagValue));
        }
      }
    }
  }

  @Override
  public void updateScore(String url, WebPage page,
      List<ScoreDatum> inlinkedScoreData) 
      throws ScoringFilterException {
    // these are the outlinks, updateScore is called once
    // per outlink to which we propagated metadata to in
    // distributeScoreToOutlinks, so this is our chance
    // to persist it there...
    // extract all user tags from inlinks
    Map<Utf8,ByteBuffer> metadata = new HashMap<Utf8,ByteBuffer>();
    for (ScoreDatum scoreDatum : inlinkedScoreData) {
      LOG.info("Looking at inlink: " + scoreDatum.getUrl());
      for (String inheritableUserTag : inheritableUserTags) {
        String value = Bytes.toString(
          scoreDatum.getMeta(inheritableUserTag));
        if (StringUtils.isNotEmpty(value)) {
          metadata.put(new Utf8(inheritableUserTag), 
            ByteBuffer.wrap(Bytes.toBytes(value)));
        }
      }
    }
    if (metadata.size() > 0) {
      for (Utf8 key : metadata.keySet()) {
        LOG.info("Saving metadata[" + TableUtil.toString(key) + 
          "] to outlink: " + url);
        page.putToMetadata(key, metadata.get(key));
      }
    }
    return;
  }

  @Override
  public float generatorSortValue(String url, WebPage page, 
      float initSort) throws ScoringFilterException {
    return initSort;
  }

  @Override
  public float indexerScore(String url, NutchDocument doc, 
      WebPage page, float initScore) 
      throws ScoringFilterException {
    return 0;
  }

  @Override
  public void initialScore(String url, WebPage page)
      throws ScoringFilterException {
    return;
  }

  @Override
  public void injectedScore(String url, WebPage page)
      throws ScoringFilterException {
    return;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
    inheritableUserTags = new HashSet<String>(
      Arrays.asList(StringUtils.split(
      conf.get(INHERITABLE_USERTAGS_LIST_PROP, ""), ",")));
    LOG.info("Property " + INHERITABLE_USERTAGS_LIST_PROP + 
      " set to " + inheritableUserTags);
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }
}

The two main methods are the distributeScoreToOutlinks and the updateScore methods (although we are really distributing and updating metadata rather than scores here). The first one is called by the DbUpdaterMapper and is used to "scatter" the parent metadata to its outlinks, and the second is called by the DbUpdaterReducer and is used to "gather" back the metadata into the child page from eac of its parent inlinks. Its a bit counter-intuitive to have this happen in two steps, but I think the reason is that in the mapper you don't have a handle to the child WebPages, and the ScoreDatum is just for passing to the reducer and is not written to Cassandra. Additionally, the get/setConf() methods are to extract the property from nutch-site.xml. The getFields() as far as I could make out, work a bit like Lucene's FieldSelectors.

The second plugin is the indexing filter. Its main method is filter(), which extracts the metadata from the webpage["sc"]["mtdt"] and places it into the NutchDocument for publishing to the index. Here is the code, its relatively simple.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/usertags/UserTagIndexingFilter.java
package com.mycompany.nutch.indexer.usertags;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;

public class UserTagIndexingFilter implements IndexingFilter {

  private static final Log LOG = LogFactory.getLog(UserTagIndexingFilter.class);
  private static final String INHERITABLE_USERTAGS_LIST_PROP = 
    "mycompany.usertags.inheritable";
  
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
  }
  
  private Configuration conf;
  private Set<String> inheritableUserTags;
  
  @Override
  public NutchDocument filter(NutchDocument doc, String url, 
      WebPage page) throws IndexingException {
    LOG.info("Adding inheritable user tags for page:" + url);
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    for (Utf8 key : metadata.keySet()) {
      String keyStr = TableUtil.toString(key);
      if (StringUtils.isEmpty(keyStr)) {
        continue;
      }
      if (inheritableUserTags.contains(keyStr)) {
        String value = Bytes.toString(
          Bytes.toBytes(metadata.get(key)));
        doc.add(keyStr, value);
      }
    }
    return doc;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
    inheritableUserTags = new HashSet<String>(
      Arrays.asList(StringUtils.split(
      conf.get(INHERITABLE_USERTAGS_LIST_PROP, ""), ",")));
    LOG.info("Property " + INHERITABLE_USERTAGS_LIST_PROP + 
      " set to " + inheritableUserTags);
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }
}

In addition, we need to add the u_idx field into the conf/solrindex-mapping.xml file so Nutch knows to index the field, like so:

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: conf/solrindex-mapping.xml -->
<mapping>
  <fields>
    ...
    <field dest="u_idx" source="u_idx"/>
  </fields>
  <uniqueKey>id</uniqueKey>
</mapping>

and we also need to add this into the schema.xml file that will go into the Solr instance.

1
  <field name="u_idx" type="string" stored="true" indexed="true"/>

And thats pretty much it, running the crawl to a depth of 3 against the test site produces 7 pages in the Solr index, as expected, and all 7 of these pages have u_idx set to "test".

No comments:

Post a Comment

Comments are moderated to prevent spam.