Motivation
Last week, I described a simple feed aggregator that runs periodically to generate a seed list with URLs for Nutch (with HBase backend) to crawl. Included with the URLs is a set of metadata extracted from the feed (title, summary, published date) that are also provided as name-value pairs in the seed list file. What was left was to actually generate a Lucene index out of the crawled data.
Nutch 2.0 does away with the local indexer and the search subsystem and uses Solr instead, so they have a SolrIndexerJob which requires a running Solr installation. Since we currently use Nutch only as a crawler, this decision works fine for us. However, I did not have a running Solr installation last week, so I had to skip the indexing portion.
This week, I set up a Solr installation following the instructions on this Lucid Imagination blog post by Sami Siren, then tried running the Solr indexer from the runtime/local subdirectory of my Nutch project with this command:
1 | sujit@cyclone:local$ bin/nutch http://localhost:8893/solr 1294515214-1766305875
|
But it failed with errors indicating a Lucene version mismatch, probably because my Solr is the SVN version, and consequently uses LUCENE_40 as its version string, and Nutch 2.0 (also from SVN) comes with Lucene 3.0 JARs. In any case, I didn't investigate this error much further, because:
- the index generated is the standard Nutch index - since I need to interface the index with existing front end code, I needed a custom format.
- it seems a bit impractical to have to generate large indexes over HTTP - I can understand its use for pushing deltas to the index periodically, in fact this feature is one reason we are considering Solr at work. But for the initial index, my preference would be to build it locally, then scp it over to Solr.
For these reasons, I decided to build my own little map-reduce job to collect the information from the HBase table and write it out to a local index. This post describes the code, and the configuration changes I needed to make.
Update Lucene and Solr JARs in Nutch
The first step was to replace some of the JAR files with the later versions required by Solr, and pull in some additional JAR files. The following JAR files were pulled from the Solr distribution into the lib subdirectory of my Nutch (Eclipse) project, and the .classpath for the Nutch project updated.
- lucene-core-4.0-SNAPSHOT.jar - replaces older lucene-core JAR
- lucene-analyzers-common-4.0-SNAPSHOT.jar - replaces older lucene-misc JAR
- solr-core-4.0-SNAPSHOT.jar - added
- solr-solrj-4.0-SNAPSHOT.jar - replaces older solr-solrj JAR
Running "ant runtime" will now create a local runtime with these JAR files instead of the original ones.
Create Solr Schema File
Since the index will ultimately be used by Solr, we need to create a Solr schema. Although this is not used directly by the Indexer Job I describe below, the information in it is used in the indexer job I built, so it made sense to create this up-front. Here is the Solr schema for my index:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | <?xml version="1.0" encoding="UTF-8" ?>
<schema name="nutch" version="1.1">
<types>
<fieldType name="string" class="solr.StrField"
sortMissingLast="true" omitNorms="true"/>
<fieldType name="long" class="solr.LongField"
omitNorms="true"/>
<fieldType name="float" class="solr.FloatField"
omitNorms="true"/>
<fieldType name="text" class="solr.TextField"
positionIncrementGap="100">
<analyzer>
<tokenizer class="solr.WhitespaceTokenizerFactory"/>
<filter class="solr.StopFilterFactory"
ignoreCase="true" words="stopwords.txt"/>
<filter class="solr.WordDelimiterFilterFactory"
generateWordParts="1" generateNumberParts="1"
catenateWords="1" catenateNumbers="1" catenateAll="0"
splitOnCaseChange="1"/>
<filter class="solr.LowerCaseFilterFactory"/>
<filter class="solr.EnglishPorterFilterFactory"
protected="protwords.txt"/>
<filter class="solr.RemoveDuplicatesTokenFilterFactory"/>
</analyzer>
</fieldType>
<fieldType name="url" class="solr.TextField"
positionIncrementGap="100">
<analyzer>
<tokenizer class="solr.StandardTokenizerFactory"/>
<filter class="solr.LowerCaseFilterFactory"/>
<filter class="solr.WordDelimiterFilterFactory"
generateWordParts="1" generateNumberParts="1"/>
<filter class="solr.RemoveDuplicatesTokenFilterFactory"/>
</analyzer>
</fieldType>
</types>
<fields>
<!-- fields for news index -->
<field name="id" type="string" stored="true" indexed="true"/>
<field name="idx" type="string" stored="true" indexed="true"/>
<field name="digest" type="string" stored="true" indexed="true"/>
<field name="url" type="url" stored="true" indexed="true" required="true"/>
<field name="title" type="text" stored="true" indexed="true"/>
<field name="content" type="text" stored="false" indexed="true"/>
<field name="publisher" type="text" stored="true" indexed="true"/>
<field name="pubDate" type="string" stored="true" indexed="true"/>
</fields>
<uniqueKey>id</uniqueKey>
<defaultSearchField>content</defaultSearchField>
<solrQueryParser defaultOperator="OR"/>
</schema>
|
Basically, I copied schema.xml from the Solr examples/solr/conf directory and replaced the fields section with my own list.
Code up the Local Indexer Job
I decided to model the Local Indexer Job as a map-reduce program, following the lead of ParserJob and SolrIndexerJob in the Nutch codebase. I could also have built a simple HBase client that queries the WebPage table by batch-id and writes selected fields to a Lucene index. In my opinion, there is not much justification for distributing the extraction across multiple mappers, since there is almost no computation involved in this process. But I don't know enough to say which is better, so basically went with the flow.
Here is the code for my Local Indexer Job. Note that I have only run this in local mode, so there may be some changes, relating to the placement of the local files, when running this on a Hadoop cluster. Also note that this code is not generic, although I think it would be relatively simple to fix, by using the schema.xml file as an input parameter to this job - I plan on doing that later, after I am more familiar with Solr code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | // Source: src/java/org/apache/nutch/indexer/ext/MyLocalIndexerJob.java
package org.apache.nutch.indexer.ext;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import org.apache.avro.util.Utf8;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Custom indexer job to pull data from HBase into an offline index
* that can be copied over to a Solr installation.
*/
public class MyLocalIndexerJob extends NutchTool implements Tool {
private static final Logger LOG =
LoggerFactory.getLogger(MyLocalIndexerJob.class);
private static final String PATH_TO_INDEX = "pathToIndex";
private static final Collection<WebPage.Field> FIELDS =
new HashSet<WebPage.Field>();
static {
FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
}
public static class MyLocalIndexerJobMapper
extends GoraMapper<String,WebPage,Text,NutchDocument> {
private Utf8 batchId;
@Override
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
batchId = new Utf8(conf.get(Nutch.ARG_BATCH));
}
@Override
public void map(String key, WebPage page, Context context)
throws IOException, InterruptedException {
Utf8 mark = Mark.PARSE_MARK.checkMark(page);
String url = TableUtil.unreverseUrl(key.toString());
if (! NutchJob.shouldProcess(mark, batchId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping " + url + "; different batch id");
}
return;
}
NutchDocument doc = buildNutchDoc(url, page);
context.write(new Text(key), doc);
}
// TODO: use the Solr schema to get list of fields. May also need
// a mapping file to map the cf:col names to Solr fields
private NutchDocument buildNutchDoc(String url, WebPage page) {
NutchDocument doc = new NutchDocument();
doc.add("id", DigestUtils.md5Hex(url));
Map<Utf8,ByteBuffer> metadata = page.getMetadata();
doc.add("idx", new String(metadata.get(new Utf8("u_idx")).array()));
byte[] content = page.getContent().array();
doc.add("digest", DigestUtils.md5Hex(content));
doc.add("content", new String(content));
doc.add("url", url);
String title = page.getTitle().toString();
String metaTitle = new String(metadata.get(new Utf8("u_title")).array());
doc.add("title", StringUtils.isNotEmpty(metaTitle) ? metaTitle : title);
doc.add("pubDate",
new String(metadata.get(new Utf8("u_pubdate")).array()));
return doc;
}
}
public static class MyLocalIndexerJobReducer
extends Reducer<Text,NutchDocument,Text,NutchDocument> {
private IndexWriter indexWriter;
private Analyzer analyzer;
@Override
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
String pathToIndex = conf.get(PATH_TO_INDEX);
LOG.info("pathToIndex=" + pathToIndex);
analyzer = MyAnalyzerUtils.buildAnalyzer();
IndexWriterConfig iwconf =
new IndexWriterConfig(Version.LUCENE_40, analyzer);
indexWriter = new IndexWriter(
FSDirectory.open(new File(pathToIndex)), iwconf);
}
@Override
public void cleanup(Context context) throws IOException {
indexWriter.commit();
indexWriter.optimize();
indexWriter.close();
}
@Override
public void reduce(Text key, Iterable<NutchDocument> values,
Context context) throws IOException, InterruptedException {
for (NutchDocument nutchdoc : values) {
Document doc = new Document();
for (String fieldName : nutchdoc.getFieldNames()) {
doc.add(new Field(fieldName, nutchdoc.getFieldValue(fieldName),
MyAnalyzerUtils.getStoreMetadata(fieldName),
MyAnalyzerUtils.getIndexMetadata(fieldName)));
}
indexWriter.addDocument(doc);
}
}
}
@Override
public Map<String,Object> run(Map<String,Object> args) throws Exception {
String batchId = (String) args.get(Nutch.ARG_BATCH);
if (batchId != null) {
getConf().set(Nutch.ARG_BATCH, batchId);
}
String pathToIndex = (String) args.get(PATH_TO_INDEX);
if (pathToIndex != null) {
getConf().set(PATH_TO_INDEX, pathToIndex);
}
currentJob = new NutchJob(getConf(), "local-index");
Collection<WebPage.Field> fields = getFields(currentJob);
StorageUtils.initMapperJob(currentJob, fields, Text.class,
NutchDocument.class, MyLocalIndexerJobMapper.class);
currentJob.setMapOutputKeyClass(Text.class);
currentJob.setMapOutputValueClass(NutchDocument.class);
currentJob.setReducerClass(MyLocalIndexerJobReducer.class);
currentJob.setNumReduceTasks(1);
currentJob.waitForCompletion(true);
ToolUtil.recordJobStatus(null, currentJob, results);
return results;
}
private Collection<WebPage.Field> getFields(Job currentJob) {
return FIELDS;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MyLocalIndexerJob <batchId> <path_to_index>");
return -1;
}
String batchId = args[0];
String pathToIndex = args[1];
return index(batchId, pathToIndex);
}
private int index(String batchId, String pathToIndex) throws Exception {
LOG.info("MyLocalIndexerJob: starting");
LOG.info("MyLocalIndexerJob: batchId:\t" + batchId);
LOG.info("MyLocalIndexerJob: index path:\t" + pathToIndex);
run(ToolUtil.toArgMap(
Nutch.ARG_BATCH, batchId,
PATH_TO_INDEX, pathToIndex));
LOG.info("MyLocalIndexerJob: success");
return 0;
}
public static void main(String[] args) throws Exception {
final int res = ToolRunner.run(NutchConfiguration.create(),
new MyLocalIndexerJob(), args);
System.exit(res);
}
}
|
The indexer job shown above uses the MyAnalyzerUtils class below to do more custom one-off non-generic stuff. Recall that the Solr schema.xml file contains custom Analyzer chains for the fields "content" and "url". Since these analyzer chains are going to be used during query, we should use the same analyzer chains to index the data. So one of the things the code below does is to build in code the analyzer chains for "content" and "url", and wrap them in a PerFieldAnalyzerWrapper with a default of KeywordAnalyzer.
The second thing it does is provide a mapping of field name to whether the field is indexable and storable.
Both of these can be computed from the contents of the schema.xml file, but I did not feel like writing an XML parser, and I am almost positive that Solr already contains this functionality somewhere - once I find it, I will pass in the schema.xml file into the indexer job and replace the class below with equivalent Solr functionality.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | // Source: src/java/org/apache/nutch/indexer/ext/MyAnalyzerUtils.java
package org.apache.nutch.indexer.ext;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.ArrayUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.analysis.core.WhitespaceTokenizer;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.solr.analysis.EnglishPorterFilterFactory;
import org.apache.solr.analysis.LowerCaseFilterFactory;
import org.apache.solr.analysis.RemoveDuplicatesTokenFilterFactory;
import org.apache.solr.analysis.StandardTokenizerFactory;
import org.apache.solr.analysis.StopFilterFactory;
import org.apache.solr.analysis.WhitespaceTokenizerFactory;
import org.apache.solr.analysis.WordDelimiterFilterFactory;
import org.apache.solr.core.SolrResourceLoader;
/**
* Analyzer building utilities.
*/
public class MyAnalyzerUtils {
@SuppressWarnings("unchecked")
private static final Map<String,Boolean> UNSTORED_FIELDS = ArrayUtils.toMap(
new Object[][] {
new Object[] {"content", Boolean.FALSE},
});
@SuppressWarnings("unchecked")
private static final Map<String,Boolean> UNINDEXED_FIELDS = ArrayUtils.toMap(
new Object[][] {});
public static Store getStoreMetadata(String fieldName) {
return UNSTORED_FIELDS.containsKey(fieldName) ? Store.NO : Store.YES;
}
public static Index getIndexMetadata(String fieldName) {
return UNINDEXED_FIELDS.containsKey(fieldName) ? Index.NO : Index.ANALYZED;
}
public static Analyzer buildAnalyzer() {
PerFieldAnalyzerWrapper analyzer =
new PerFieldAnalyzerWrapper(new KeywordAnalyzer());
analyzer.addAnalyzer("url", getUrlAnalyzer());
Analyzer textAnalyzer = getTextAnalyzer();
analyzer.addAnalyzer("title", textAnalyzer);
analyzer.addAnalyzer("content", textAnalyzer);
return analyzer;
}
private static Analyzer getUrlAnalyzer() {
return new Analyzer() {
@SuppressWarnings("unchecked")
@Override
public TokenStream tokenStream(String fieldName, Reader reader) {
StandardTokenizerFactory tokenizer = new StandardTokenizerFactory();
tokenizer.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
TokenStream tokenStream = tokenizer.create(reader);
LowerCaseFilterFactory filter1 = new LowerCaseFilterFactory();
filter1.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
WordDelimiterFilterFactory filter2 = new WordDelimiterFilterFactory();
filter2.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
filter2.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"},
new String[] {"generateWordParts", "1"},
new String[] {"generateNumberParts", "1"}
}));
RemoveDuplicatesTokenFilterFactory filter3 =
new RemoveDuplicatesTokenFilterFactory();
filter3.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
return filter3.create(filter2.create(filter1.create(tokenStream)));
}
};
}
private static Analyzer getTextAnalyzer() {
return new Analyzer() {
@SuppressWarnings({ "deprecation", "unchecked" })
@Override
public TokenStream tokenStream(String fieldName, Reader reader) {
SolrResourceLoader loader = new SolrResourceLoader(null, null);
WhitespaceTokenizerFactory tokenizer = new WhitespaceTokenizerFactory();
tokenizer.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
TokenStream tokenStream = tokenizer.create(reader);
StopFilterFactory filter1 = new StopFilterFactory();
filter1.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"},
new String[] {"ignoreCase", "true"},
new String[] {"words", "solr-stopwords.txt"}
}));
filter1.inform(loader);
WordDelimiterFilterFactory filter2 = new WordDelimiterFilterFactory();
filter2.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"},
new String[] {"generateWordParts", "1"},
new String[] {"generateNumberParts", "1"},
new String[] {"catenateWords", "1"},
new String[] {"catenateNumbers", "1"},
new String[] {"catenateAll", "0"},
new String[] {"splitOnCaseChange", "1"}
}));
LowerCaseFilterFactory filter3 = new LowerCaseFilterFactory();
filter3.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
EnglishPorterFilterFactory filter4 = new EnglishPorterFilterFactory();
filter4.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"},
new String[] {"protected", "solr-protwords.txt"}
}));
filter4.inform(loader);
RemoveDuplicatesTokenFilterFactory filter5 =
new RemoveDuplicatesTokenFilterFactory();
filter5.init(ArrayUtils.toMap(new Object[][] {
new String[] {"luceneMatchVersion", "LUCENE_40"}
}));
return filter5.create(filter4.create(
filter3.create(filter2.create(filter1.create(tokenStream)))));
}
};
}
}
|
Copy Solr Analyzer Resource Files to Nutch
The getTextAnalyzer() and getUrlAnalyzer() in the MyAnalyzerUtils class instantiate the StopFilterFactory and EnglishPorterStemmerFactory classes, both of which require local resource files "stopwords.txt" and "protwords.txt" respectively. SolrResourceLoader looks in the conf/ subdirectory where the job is running, so I copied these files off the solr/examples/solr/conf directory of the Solr project into the runtime/local/conf directory of my Nutch project.
Run Local Indexer Job
My two classes are compiled into the nutch-2.0-dev.jar by running "ant runtime" at the root of the Nutch project. Then we run the following command from the runtime/local subdirectory:
1 2 | sujit@cyclone:local$ bin/nutch org.apache.nutch.indexer.ext.MyLocalIndexerJob \
1294515214-1766305875 /tmp/newsindex
|
This produces output in runtime/local/logs/hadoop.log and the index in /tmp/newsindex.
Run Solr with the new index
To do this, I navigated to the solr/examples subdirectory of my Solr project, then cloned the solr instance. After that I copied the freshly generated newsindex to the instance's data/index directory, and the schema.xml file to its conf directory.
1 2 3 4 5 6 | sujit@cyclone:example$ cp -r solr nutch
sujit@cyclone:example$ cd nutch/data/index
sujit@cyclone:index$ cp /tmp/newsindex/* .
sujit@cyclone:index$ cd -
sujit@cyclone:example$ cp /tmp/schema.xml nutch/conf
sujit@cyclone:example$ java -Dsolr.solr.home=nutch -jar start.jar
|
This starts up Solr at port 8983 on localhost, and we can navigate to the admin console on our browser at http://localhost:8983/solr/admin/.
Entering a query such as "universe black holes" into the form returns an XML response that looks like this:
And thats pretty much it. Now my feed aggregator uses Nutch as a crawler and indexer, to generate an index that will be consumed using Solr. In the process, I learned how to write a Hadoop map-reduce job that uses Nutch and GORA, and a little bit of Solr. Over the next few weeks, I plan to explore these in greater detail, and write about them if the explorations prove interesting enough.
If you were writing the same feature on nutch 1.3, how would you do it? What would be the proper extension point.
ReplyDeleteThanks
I think it would be a combination of Parser (or HTMLParseFilter, for extracting the metadata supplied in the fetch list), and IndexingFilter.
ReplyDelete