Saturday, January 22, 2011

Payloads with Solr

I started looking at Solr again recently - the last time I used it (as a user, not a developer) was at CNET years ago, when Solr was being developed and deployed inhouse. Reading the Solr 1.4 Enterprise Search Server book, I was struck by how far Solr (post version 1.3) has come in terms of features since I last saw it.

Of course, using Solr is not that hard, its just an HTTP based API, what I really wanted to do was understand how to customize it for my needs, and I learn best by doing, I decided to solve for some scenarios that are common at work. One such scenario is concept searching. I have written about this before, using Lucene payloads to provide a possible solution. This time, I decided to extend that solution to run on Solr.

Schema

Turns out that a lot of this functionality is already available (at least in the SVN version) in Solr. The default schema.xml contains a field definition for payload fields, as well as analyzer chain definitions, which I simply copied. I decided to use a simple schema for my experiments, adapted from the default Solr schema.xml file. My schema file (plex, for PayLoad EXtension) is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!-- Source: solr/example/plex/conf/schema.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<schema name="plex" version="1.3">
  <types>
    <fieldType name="string" class="solr.StrField" 
      sortMissingLast="true" omitNorms="true"/>
    <fieldType name="text" class="solr.TextField" 
      positionIncrementGap="100" autoGeneratePhraseQueries="true">
      <analyzer>
        <tokenizer class="solr.WhitespaceTokenizerFactory"/>
        <filter class="solr.StopFilterFactory"
          ignoreCase="true" words="stopwords.txt" 
          enablePositionIncrements="true"/>
        <filter class="solr.WordDelimiterFilterFactory" 
          generateWordParts="1" generateNumberParts="1" 
          catenateWords="1" catenateNumbers="1" 
          catenateAll="0" splitOnCaseChange="1"/>
        <filter class="solr.LowerCaseFilterFactory"/>
        <filter class="solr.KeywordMarkerFilterFactory" 
          protected="protwords.txt"/>
        <filter class="solr.PorterStemFilterFactory"/>
      </analyzer>
    </fieldType>
    <fieldtype name="payloads" stored="false" indexed="true" 
      class="solr.TextField" >
      <analyzer>
        <tokenizer class="solr.WhitespaceTokenizerFactory"/>
        <filter class="solr.DelimitedPayloadTokenFilterFactory" 
          delimiter="$" encoder="float"/>
      </analyzer>
    </fieldtype>
  </types>
  <fields>
    <field name="id" type="string" indexed="true" stored="true" 
      required="true" /> 
    <field name="url" type="string" indexed="false" stored="true" 
      required="true"/>
    <field name="title" type="text" indexed="true" stored="true"/>
    <field name="keywords" type="text" indexed="true" stored="true" 
      multiValued="true"/>
    <field name="concepts" type="payloads" indexed="true" stored="true"/>
    <field name="description" type="text" indexed="true" stored="true"/>
    <field name="author" type="string" indexed="true" stored="true"/>
    <field name="content" type="text" indexed="true" stored="false"/>
  </fields>
  <uniqueKey>id</uniqueKey>
  <defaultSearchField>content</defaultSearchField>
  <solrQueryParser defaultOperator="OR"/>
  <similarity class="org.apache.solr.search.ext.MyPerFieldSimilarityWrapper"/>
</schema>

Ignore the <similarity> tag towards the bottom of the file for now. The schema describes a record containing a payload field called "concepts" of type "payloads", which is defined, along with its analyzer chain, in the types section of this file.

Indexing

For my experiment, I just cloned the examples/solr instance into examples/plex, and copied the schema.xml file into it. Then I started the instance with the following command from the solr/examples directory:

1
sujit@cyclone:example$ java -Dsolr.solr.home=plex -jar start.jar

On another terminal, I deleted the current records (none to begin with, but you will need to do this for testing iterations), then added two records with payloads.

1
2
3
4
sujit@cyclone:tmp$ curl http://localhost:8983/solr/update?commit=true -d \
  '<delete><query>*:*</query></delete>'
sujit@cyclone:tmp$ curl http://localhost:8983/solr/update \
  -H "Content-Type: text/xml" --data-binary @upload.xml

The contents of upload.xml are shown below - its basically 2 records, followed by an optimize (not mandatory), and a commit call (to make the data show up on the search interface).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<update>
  <add allowDups="false">
    <doc>
      <field name="id">1</field>
      <field name="url">http://www.myco.com/doc-1.html</field>
      <field name="title">My First Document</field>
      <field name="keywords">keyword_1</field>
      <field name="keywords">keyword_2</field>
      <field name="concepts">123456$12.0 234567$22.4</field>
      <field name="description">Description for My First Document</field>
      <field name="author">Pig Me</field>
      <field name="content">This is the house that Jack built. It was a mighty \
        fine house, but it was built out of straw. So the wicked old fox \
        huffed, and puffed, and blew the house down. Which was just as well, \
        since Jack built this house for testing purposes.
      </field>
    </doc>
    <doc>
      <field name="id">2</field>
      <field name="url">http://www.myco.com/doc-2.html</field>
      <field name="title">My Second Document</field>
      <field name="keywords">keyword_3</field>
      <field name="keywords">keyword_2</field>
      <field name="concepts">123456$44.0 345678$20.4</field>
      <field name="description">Description for My Second Document</field>
      <field name="author">Will E Coyote</field>
      <field name="content">This is the story of the three little pigs who \
        went to the market to find material to build a house with so the \
        wily old fox would not be able to blow their houses down with some \
        random huffing and puffing.
      </field>
    </doc>
  </add>
  <commit/>
  <optimize/>
</update>

Searching

At this point, we still need to verify that the payload fields were correctly added, and that we can search using the payloads. Our requirement is that a payload search such as "concepts:123456" would return all records where such a concept exists, in descending order of the concept score.

Solr does not support such a search handler out of the box, but it is fairly simple to build one, by creating a custom QParserPlugin extension, and attaching it (in solrconfig.xml) to an instance of solr.SearchHandler. The relevant snippet from solrconfig.xml is shown below:

1
2
3
4
5
6
7
8
  <!-- Request Handler to do payload queries -->
  <queryParser name="payloadQueryParser" 
    class="org.apache.solr.search.ext.PayloadQParserPlugin"/>
  <requestHandler name="/concept-search" class="solr.SearchHandler">
    <lst name="defaults">
      <str name="defType">payloadQueryParser</str>
    </lst>
  </requestHandler>

Here's the code for the PayloadQParserPlugin (modeled after example code in FooQParserPlugin in the Solr codebase). It is just a container for the inner PayloadQParser class which parses the incoming query and returns a PayloadTermQuery. The parser has rudimentary support for AND-ing and OR-ing multiple payload queries. For payload fields, we want to use only the payload scores for scoring, so we specify that in the PayloadTermQuery constructor.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Source: src/java/org/apache/solr/search/ext/PayloadQParserPlugin.java
package org.apache.solr.search.ext;

import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.payloads.AveragePayloadFunction;
import org.apache.lucene.search.payloads.PayloadTermQuery;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QParserPlugin;

/**
 * Parser plugin to parse payload queries.
 */
public class PayloadQParserPlugin extends QParserPlugin {

  @Override
  public QParser createParser(String qstr, SolrParams localParams,
      SolrParams params, SolrQueryRequest req) {
    return new PayloadQParser(qstr, localParams, params, req);
  }

  @Override
  public void init(NamedList args) {
  }
}

class PayloadQParser extends QParser {

  public PayloadQParser(String qstr, SolrParams localParams, SolrParams params,
      SolrQueryRequest req) {
    super(qstr, localParams, params, req);
  }

  @Override
  public Query parse() throws ParseException {
    BooleanQuery q = new BooleanQuery();
    String[] nvps = StringUtils.split(qstr, " ");
    for (int i = 0; i < nvps.length; i++) {
      String[] nv = StringUtils.split(nvps[i], ":");
      if (nv[0].startsWith("+")) {
        q.add(new PayloadTermQuery(new Term(nv[0].substring(1), nv[1]), 
          new AveragePayloadFunction(), false), Occur.MUST);
      } else {
        q.add(new PayloadTermQuery(new Term(nv[0], nv[1]), 
          new AveragePayloadFunction(), false), Occur.SHOULD);
      }
    }
    return q;
  }
}

To deploy these changes, I ran the following commands at the root of the Solr project, then restarted the plex instance using the java -jar start.jar command shown above.

1
2
3
sujit@cyclone:solr$ ant dist-war
sujit@cyclone:solr$ cp dist/apache-solr-4.0-SNAPSHOT.war \
  example/webapps/solr.war

At this point, we are able to search for concepts using Payload queries, using the URL to the custom handler we defined in solrconfig.xml.

1
2
http://localhost:8983/solr/concept-search/?q=concepts:234567\
  &version=2.2&start=0&rows=10&indent=on

We still need to tell Solr what order to return the records in the result back in. By default, Solr uses the DefaultSimilarity - we need to tell it to use the payload scores for payload queries and DefaultSimilarity for all others. Currently, however, Solr supports only a single Similarity for a given schema - to get around that, I build a similarity wrapper triggered by field name, similar to the PerFieldAnalyzerWrapper on the indexing side. I believe LUCENE-2236 addresses this in a much more elegant way, I will make the necessary change when that becomes available. Here is the code for the Similarity Wrapper class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Source: src/java/org/apache/solr/search/ext/MyPerFieldSimilarityWrapper.java
package org.apache.solr.search.ext;

import java.util.HashMap;
import java.util.Map;

import org.apache.lucene.search.DefaultSimilarity;
import org.apache.lucene.search.Similarity;

/**
 * A delegating Similarity implementation similar to PerFieldAnalyzerWrapper.
 */
public class MyPerFieldSimilarityWrapper extends Similarity {

  private static final long serialVersionUID = -7777069917322737611L;

  private Similarity defaultSimilarity;
  private Map<String,Similarity> fieldSimilarityMap; 
  
  public MyPerFieldSimilarityWrapper() {
    this.defaultSimilarity = new DefaultSimilarity();
    this.fieldSimilarityMap = new HashMap<String,Similarity>();
    this.fieldSimilarityMap.put("concepts", new PayloadSimilarity());
  }
  
  @Override
  public float coord(int overlap, int maxOverlap) {
    return defaultSimilarity.coord(overlap, maxOverlap);
  }

  @Override
  public float idf(int docFreq, int numDocs) {
    return defaultSimilarity.idf(docFreq, numDocs);
  }

  @Override
  public float lengthNorm(String fieldName, int numTokens) {
    Similarity sim = fieldSimilarityMap.get(fieldName);
    if (sim == null) {
      return defaultSimilarity.lengthNorm(fieldName, numTokens);
    } else {
      return sim.lengthNorm(fieldName, numTokens);
    }
  }

  @Override
  public float queryNorm(float sumOfSquaredWeights) {
    return defaultSimilarity.queryNorm(sumOfSquaredWeights);
  }

  @Override
  public float sloppyFreq(int distance) {
    return defaultSimilarity.sloppyFreq(distance);
  }

  @Override
  public float tf(float freq) {
    return defaultSimilarity.tf(freq);
  }
  
  @Override
  public float scorePayload(int docId, String fieldName,
      int start, int end, byte[] payload, int offset, int length) {
    Similarity sim = fieldSimilarityMap.get(fieldName);
    if (sim == null) {
      return defaultSimilarity.scorePayload(docId, fieldName, 
        start, end, payload, offset, length);
    } else {
      return sim.scorePayload(docId, fieldName, 
        start, end, payload, offset, length);
    }
  }
}

As you can see, the methods that take a field name switch between the default similarity implementation and the field specific ones. We have only one of these, the PayloadSimilarity, the code for which is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/java/org/apache/solr/search/ext/PayloadSimilarity.java
package org.apache.solr.search.ext;

import org.apache.lucene.analysis.payloads.PayloadHelper;
import org.apache.lucene.search.DefaultSimilarity;

/**
 * Payload Similarity implementation. Uses Payload scores for scoring.
 */
public class PayloadSimilarity extends DefaultSimilarity {

  private static final long serialVersionUID = -2402909220013794848L;

  @Override
  public float scorePayload(int docId, String fieldName,
      int start, int end, byte[] payload, int offset, int length) {
    if (payload != null) {
      return PayloadHelper.decodeFloat(payload, offset);
    } else {
      return 1.0F;
    }
  }
}

Once again, we deploy the Solr WAR file with this new class, and restart the plex instance, and this time we can verify that we get back the records in the correct order.

1
2
http://localhost:8983/solr/concept-search/?q=concepts:123456\
  &fl=*,score&version=2.2&start=0&rows=10&indent=on

We need a quick check to verify that queries other than concept don't use our PayloadSimilarity. Our example concept payload scores are in the range 1-100, and the scores in the results for the URL below are in the range 0-1, indicating that the DefaultSimilarity was used for this query, which is what we wanted to happen.

1
2
http://localhost:8983/solr/select/?q=huffing\
  &fl=*,score&version=2.2&start=0&rows=10&indent=on

References

The following resources were very helpful while developing this solution.

Friday, January 14, 2011

Feed Aggregation using ROME Fetcher and Nutch 2.0 - Part II

Motivation

Last week, I described a simple feed aggregator that runs periodically to generate a seed list with URLs for Nutch (with HBase backend) to crawl. Included with the URLs is a set of metadata extracted from the feed (title, summary, published date) that are also provided as name-value pairs in the seed list file. What was left was to actually generate a Lucene index out of the crawled data.

Nutch 2.0 does away with the local indexer and the search subsystem and uses Solr instead, so they have a SolrIndexerJob which requires a running Solr installation. Since we currently use Nutch only as a crawler, this decision works fine for us. However, I did not have a running Solr installation last week, so I had to skip the indexing portion.

This week, I set up a Solr installation following the instructions on this Lucid Imagination blog post by Sami Siren, then tried running the Solr indexer from the runtime/local subdirectory of my Nutch project with this command:

1
sujit@cyclone:local$ bin/nutch http://localhost:8893/solr 1294515214-1766305875

But it failed with errors indicating a Lucene version mismatch, probably because my Solr is the SVN version, and consequently uses LUCENE_40 as its version string, and Nutch 2.0 (also from SVN) comes with Lucene 3.0 JARs. In any case, I didn't investigate this error much further, because:

  • the index generated is the standard Nutch index - since I need to interface the index with existing front end code, I needed a custom format.
  • it seems a bit impractical to have to generate large indexes over HTTP - I can understand its use for pushing deltas to the index periodically, in fact this feature is one reason we are considering Solr at work. But for the initial index, my preference would be to build it locally, then scp it over to Solr.

For these reasons, I decided to build my own little map-reduce job to collect the information from the HBase table and write it out to a local index. This post describes the code, and the configuration changes I needed to make.

Update Lucene and Solr JARs in Nutch

The first step was to replace some of the JAR files with the later versions required by Solr, and pull in some additional JAR files. The following JAR files were pulled from the Solr distribution into the lib subdirectory of my Nutch (Eclipse) project, and the .classpath for the Nutch project updated.

  • lucene-core-4.0-SNAPSHOT.jar - replaces older lucene-core JAR
  • lucene-analyzers-common-4.0-SNAPSHOT.jar - replaces older lucene-misc JAR
  • solr-core-4.0-SNAPSHOT.jar - added
  • solr-solrj-4.0-SNAPSHOT.jar - replaces older solr-solrj JAR

Running "ant runtime" will now create a local runtime with these JAR files instead of the original ones.

Create Solr Schema File

Since the index will ultimately be used by Solr, we need to create a Solr schema. Although this is not used directly by the Indexer Job I describe below, the information in it is used in the indexer job I built, so it made sense to create this up-front. Here is the Solr schema for my index:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?xml version="1.0" encoding="UTF-8" ?>
<schema name="nutch" version="1.1">
  <types>
    <fieldType name="string" class="solr.StrField"
      sortMissingLast="true" omitNorms="true"/>
    <fieldType name="long" class="solr.LongField"
      omitNorms="true"/>
    <fieldType name="float" class="solr.FloatField"
      omitNorms="true"/>
    <fieldType name="text" class="solr.TextField"
      positionIncrementGap="100">
      <analyzer>
        <tokenizer class="solr.WhitespaceTokenizerFactory"/>
        <filter class="solr.StopFilterFactory"
          ignoreCase="true" words="stopwords.txt"/>
        <filter class="solr.WordDelimiterFilterFactory"
          generateWordParts="1" generateNumberParts="1"
          catenateWords="1" catenateNumbers="1" catenateAll="0"
          splitOnCaseChange="1"/>
        <filter class="solr.LowerCaseFilterFactory"/>
        <filter class="solr.EnglishPorterFilterFactory"
          protected="protwords.txt"/>
        <filter class="solr.RemoveDuplicatesTokenFilterFactory"/>
      </analyzer>
    </fieldType>
    <fieldType name="url" class="solr.TextField"
      positionIncrementGap="100">
      <analyzer>
        <tokenizer class="solr.StandardTokenizerFactory"/>
        <filter class="solr.LowerCaseFilterFactory"/>
        <filter class="solr.WordDelimiterFilterFactory"
          generateWordParts="1" generateNumberParts="1"/>
        <filter class="solr.RemoveDuplicatesTokenFilterFactory"/>
      </analyzer>
    </fieldType>
  </types>
  <fields>
    <!-- fields for news index -->
    <field name="id" type="string" stored="true" indexed="true"/>
    <field name="idx" type="string" stored="true" indexed="true"/>
    <field name="digest" type="string" stored="true" indexed="true"/>
    <field name="url" type="url" stored="true" indexed="true" required="true"/>
    <field name="title" type="text" stored="true" indexed="true"/>
    <field name="content" type="text" stored="false" indexed="true"/>
    <field name="publisher" type="text" stored="true" indexed="true"/>
    <field name="pubDate" type="string" stored="true" indexed="true"/>
  </fields>
  <uniqueKey>id</uniqueKey>
  <defaultSearchField>content</defaultSearchField>
  <solrQueryParser defaultOperator="OR"/>
</schema>

Basically, I copied schema.xml from the Solr examples/solr/conf directory and replaced the fields section with my own list.

Code up the Local Indexer Job

I decided to model the Local Indexer Job as a map-reduce program, following the lead of ParserJob and SolrIndexerJob in the Nutch codebase. I could also have built a simple HBase client that queries the WebPage table by batch-id and writes selected fields to a Lucene index. In my opinion, there is not much justification for distributing the extraction across multiple mappers, since there is almost no computation involved in this process. But I don't know enough to say which is better, so basically went with the flow.

Here is the code for my Local Indexer Job. Note that I have only run this in local mode, so there may be some changes, relating to the placement of the local files, when running this on a Hadoop cluster. Also note that this code is not generic, although I think it would be relatively simple to fix, by using the schema.xml file as an input parameter to this job - I plan on doing that later, after I am more familiar with Solr code.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Source: src/java/org/apache/nutch/indexer/ext/MyLocalIndexerJob.java
package org.apache.nutch.indexer.ext;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;

import org.apache.avro.util.Utf8;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Custom indexer job to pull data from HBase into an offline index
 * that can be copied over to a Solr installation.
 */
public class MyLocalIndexerJob extends NutchTool implements Tool {

  private static final Logger LOG = 
    LoggerFactory.getLogger(MyLocalIndexerJob.class);
  
  private static final String PATH_TO_INDEX = "pathToIndex";
  private static final Collection<WebPage.Field> FIELDS = 
    new HashSet<WebPage.Field>();
  
  static {
    FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
  }
  
  public static class MyLocalIndexerJobMapper 
      extends GoraMapper<String,WebPage,Text,NutchDocument> {
    
    private Utf8 batchId;
    
    @Override
    public void setup(Context context) throws IOException {
      Configuration conf = context.getConfiguration();
      batchId = new Utf8(conf.get(Nutch.ARG_BATCH));
    }
    
    @Override
    public void map(String key, WebPage page, Context context)
        throws IOException, InterruptedException {
      Utf8 mark = Mark.PARSE_MARK.checkMark(page);
      String url = TableUtil.unreverseUrl(key.toString());
      if (! NutchJob.shouldProcess(mark, batchId)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Skipping " + url + "; different batch id");
        }
        return;
      }
      NutchDocument doc = buildNutchDoc(url, page);
      context.write(new Text(key), doc);
    }

    // TODO: use the Solr schema to get list of fields. May also need
    // a mapping file to map the cf:col names to Solr fields
    private NutchDocument buildNutchDoc(String url, WebPage page) {
      NutchDocument doc = new NutchDocument();
      doc.add("id", DigestUtils.md5Hex(url));
      Map<Utf8,ByteBuffer> metadata = page.getMetadata();
      doc.add("idx", new String(metadata.get(new Utf8("u_idx")).array()));
      byte[] content = page.getContent().array();
      doc.add("digest", DigestUtils.md5Hex(content));
      doc.add("content", new String(content));
      doc.add("url", url);
      String title = page.getTitle().toString();
      String metaTitle = new String(metadata.get(new Utf8("u_title")).array());
      doc.add("title", StringUtils.isNotEmpty(metaTitle) ? metaTitle : title);
      doc.add("pubDate", 
        new String(metadata.get(new Utf8("u_pubdate")).array()));
      return doc;
    }
  }
  
  public static class MyLocalIndexerJobReducer 
      extends Reducer<Text,NutchDocument,Text,NutchDocument> {
    
    private IndexWriter indexWriter;
    private Analyzer analyzer;
    
    @Override
    public void setup(Context context) throws IOException {
      Configuration conf = context.getConfiguration();
      String pathToIndex = conf.get(PATH_TO_INDEX);
      LOG.info("pathToIndex=" + pathToIndex);
      analyzer = MyAnalyzerUtils.buildAnalyzer();
      IndexWriterConfig iwconf = 
        new IndexWriterConfig(Version.LUCENE_40, analyzer);
      indexWriter = new IndexWriter(
        FSDirectory.open(new File(pathToIndex)), iwconf);
    }
    
    @Override
    public void cleanup(Context context) throws IOException {
      indexWriter.commit();
      indexWriter.optimize();
      indexWriter.close();
    }
    
    @Override
    public void reduce(Text key, Iterable<NutchDocument> values, 
        Context context) throws IOException, InterruptedException {
      for (NutchDocument nutchdoc : values) {
        Document doc = new Document();
        for (String fieldName : nutchdoc.getFieldNames()) {
          doc.add(new Field(fieldName, nutchdoc.getFieldValue(fieldName),
            MyAnalyzerUtils.getStoreMetadata(fieldName),
            MyAnalyzerUtils.getIndexMetadata(fieldName)));
        }
        indexWriter.addDocument(doc);
      }
    }
  }
  
  @Override
  public Map<String,Object> run(Map<String,Object> args) throws Exception {
    String batchId = (String) args.get(Nutch.ARG_BATCH);
    if (batchId != null) {
      getConf().set(Nutch.ARG_BATCH, batchId);
    }
    String pathToIndex = (String) args.get(PATH_TO_INDEX);
    if (pathToIndex != null) {
      getConf().set(PATH_TO_INDEX, pathToIndex);
    }
    currentJob = new NutchJob(getConf(), "local-index");
    Collection<WebPage.Field> fields = getFields(currentJob);
    StorageUtils.initMapperJob(currentJob, fields, Text.class, 
      NutchDocument.class, MyLocalIndexerJobMapper.class);
    currentJob.setMapOutputKeyClass(Text.class);
    currentJob.setMapOutputValueClass(NutchDocument.class);
    currentJob.setReducerClass(MyLocalIndexerJobReducer.class);
    currentJob.setNumReduceTasks(1);
    currentJob.waitForCompletion(true);
    ToolUtil.recordJobStatus(null, currentJob, results);
    return results;
  }

  private Collection<WebPage.Field> getFields(Job currentJob) {
    return FIELDS;
  }

  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: MyLocalIndexerJob <batchId> <path_to_index>");
      return -1;
    }
    String batchId = args[0];
    String pathToIndex = args[1];
    return index(batchId, pathToIndex);
  }

  private int index(String batchId, String pathToIndex) throws Exception {
    LOG.info("MyLocalIndexerJob: starting");
    LOG.info("MyLocalIndexerJob: batchId:\t" + batchId);
    LOG.info("MyLocalIndexerJob: index path:\t" + pathToIndex);
    run(ToolUtil.toArgMap(
      Nutch.ARG_BATCH, batchId,
      PATH_TO_INDEX, pathToIndex));
    LOG.info("MyLocalIndexerJob: success");
    return 0;
  }

  public static void main(String[] args) throws Exception {
    final int res = ToolRunner.run(NutchConfiguration.create(), 
      new MyLocalIndexerJob(), args);
    System.exit(res);
  }
}

The indexer job shown above uses the MyAnalyzerUtils class below to do more custom one-off non-generic stuff. Recall that the Solr schema.xml file contains custom Analyzer chains for the fields "content" and "url". Since these analyzer chains are going to be used during query, we should use the same analyzer chains to index the data. So one of the things the code below does is to build in code the analyzer chains for "content" and "url", and wrap them in a PerFieldAnalyzerWrapper with a default of KeywordAnalyzer.

The second thing it does is provide a mapping of field name to whether the field is indexable and storable.

Both of these can be computed from the contents of the schema.xml file, but I did not feel like writing an XML parser, and I am almost positive that Solr already contains this functionality somewhere - once I find it, I will pass in the schema.xml file into the indexer job and replace the class below with equivalent Solr functionality.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Source: src/java/org/apache/nutch/indexer/ext/MyAnalyzerUtils.java 
package org.apache.nutch.indexer.ext;

import java.io.Reader;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.ArrayUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.analysis.core.WhitespaceTokenizer;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.solr.analysis.EnglishPorterFilterFactory;
import org.apache.solr.analysis.LowerCaseFilterFactory;
import org.apache.solr.analysis.RemoveDuplicatesTokenFilterFactory;
import org.apache.solr.analysis.StandardTokenizerFactory;
import org.apache.solr.analysis.StopFilterFactory;
import org.apache.solr.analysis.WhitespaceTokenizerFactory;
import org.apache.solr.analysis.WordDelimiterFilterFactory;
import org.apache.solr.core.SolrResourceLoader;

/**
 * Analyzer building utilities.
 */
public class MyAnalyzerUtils {

  @SuppressWarnings("unchecked")
  private static final Map<String,Boolean> UNSTORED_FIELDS = ArrayUtils.toMap(
    new Object[][] {
      new Object[] {"content", Boolean.FALSE},
  });
  @SuppressWarnings("unchecked")
  private static final Map<String,Boolean> UNINDEXED_FIELDS = ArrayUtils.toMap(
    new Object[][] {});
  
  public static Store getStoreMetadata(String fieldName) {
    return UNSTORED_FIELDS.containsKey(fieldName) ? Store.NO : Store.YES;
  }
  
  public static Index getIndexMetadata(String fieldName) {
    return UNINDEXED_FIELDS.containsKey(fieldName) ? Index.NO : Index.ANALYZED;
  }
  
  public static Analyzer buildAnalyzer() {
    PerFieldAnalyzerWrapper analyzer = 
      new PerFieldAnalyzerWrapper(new KeywordAnalyzer());
    analyzer.addAnalyzer("url", getUrlAnalyzer());
    Analyzer textAnalyzer = getTextAnalyzer();
    analyzer.addAnalyzer("title", textAnalyzer);
    analyzer.addAnalyzer("content", textAnalyzer);
    return analyzer;
  }

  private static Analyzer getUrlAnalyzer() {
    return new Analyzer() {
      @SuppressWarnings("unchecked")
      @Override
      public TokenStream tokenStream(String fieldName, Reader reader) {
        StandardTokenizerFactory tokenizer = new StandardTokenizerFactory();
        tokenizer.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        TokenStream tokenStream = tokenizer.create(reader);
        LowerCaseFilterFactory filter1 = new LowerCaseFilterFactory();
        filter1.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        WordDelimiterFilterFactory filter2 = new WordDelimiterFilterFactory();
        filter2.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        filter2.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"},
          new String[] {"generateWordParts", "1"},
          new String[] {"generateNumberParts", "1"}
        }));
        RemoveDuplicatesTokenFilterFactory filter3 = 
          new RemoveDuplicatesTokenFilterFactory();
        filter3.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        return filter3.create(filter2.create(filter1.create(tokenStream)));
      }
    };
  }

  private static Analyzer getTextAnalyzer() {
    return new Analyzer() {
      @SuppressWarnings({ "deprecation", "unchecked" })
      @Override
      public TokenStream tokenStream(String fieldName, Reader reader) {
        SolrResourceLoader loader = new SolrResourceLoader(null, null);
        WhitespaceTokenizerFactory tokenizer = new WhitespaceTokenizerFactory();
        tokenizer.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        TokenStream tokenStream = tokenizer.create(reader);
        StopFilterFactory filter1 = new StopFilterFactory();
        filter1.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"},
          new String[] {"ignoreCase", "true"},
          new String[] {"words", "solr-stopwords.txt"}
        }));
        filter1.inform(loader);
        WordDelimiterFilterFactory filter2 = new WordDelimiterFilterFactory();
        filter2.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"},
          new String[] {"generateWordParts", "1"},
          new String[] {"generateNumberParts", "1"},
          new String[] {"catenateWords", "1"},
          new String[] {"catenateNumbers", "1"},
          new String[] {"catenateAll", "0"},
          new String[] {"splitOnCaseChange", "1"}
        }));
        LowerCaseFilterFactory filter3 = new LowerCaseFilterFactory();
        filter3.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        EnglishPorterFilterFactory filter4 = new EnglishPorterFilterFactory();
        filter4.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"},
          new String[] {"protected", "solr-protwords.txt"}
        }));
        filter4.inform(loader);
        RemoveDuplicatesTokenFilterFactory filter5 = 
          new RemoveDuplicatesTokenFilterFactory();
        filter5.init(ArrayUtils.toMap(new Object[][] {
          new String[] {"luceneMatchVersion", "LUCENE_40"}
        }));
        return filter5.create(filter4.create(
          filter3.create(filter2.create(filter1.create(tokenStream)))));
      }
    };
  }
}

Copy Solr Analyzer Resource Files to Nutch

The getTextAnalyzer() and getUrlAnalyzer() in the MyAnalyzerUtils class instantiate the StopFilterFactory and EnglishPorterStemmerFactory classes, both of which require local resource files "stopwords.txt" and "protwords.txt" respectively. SolrResourceLoader looks in the conf/ subdirectory where the job is running, so I copied these files off the solr/examples/solr/conf directory of the Solr project into the runtime/local/conf directory of my Nutch project.

Run Local Indexer Job

My two classes are compiled into the nutch-2.0-dev.jar by running "ant runtime" at the root of the Nutch project. Then we run the following command from the runtime/local subdirectory:

1
2
sujit@cyclone:local$ bin/nutch org.apache.nutch.indexer.ext.MyLocalIndexerJob \
  1294515214-1766305875 /tmp/newsindex

This produces output in runtime/local/logs/hadoop.log and the index in /tmp/newsindex.

Run Solr with the new index

To do this, I navigated to the solr/examples subdirectory of my Solr project, then cloned the solr instance. After that I copied the freshly generated newsindex to the instance's data/index directory, and the schema.xml file to its conf directory.

1
2
3
4
5
6
sujit@cyclone:example$ cp -r solr nutch
sujit@cyclone:example$ cd nutch/data/index
sujit@cyclone:index$ cp /tmp/newsindex/* .
sujit@cyclone:index$ cd -
sujit@cyclone:example$ cp /tmp/schema.xml nutch/conf
sujit@cyclone:example$ java -Dsolr.solr.home=nutch -jar start.jar

This starts up Solr at port 8983 on localhost, and we can navigate to the admin console on our browser at http://localhost:8983/solr/admin/.

Entering a query such as "universe black holes" into the form returns an XML response that looks like this:

And thats pretty much it. Now my feed aggregator uses Nutch as a crawler and indexer, to generate an index that will be consumed using Solr. In the process, I learned how to write a Hadoop map-reduce job that uses Nutch and GORA, and a little bit of Solr. Over the next few weeks, I plan to explore these in greater detail, and write about them if the explorations prove interesting enough.