Introduction
I've been playing with Nutch the past couple of weeks (see here and here). One of the reasons I started looking at Nutch was the hope that its code would help me understand how to build real-life programs with Hadoop. Here is a toy example I came up with - a rather primitive semantic indexer that uses my blog pages as input, and my set of blog tags (manually enhanced, see below) as its dictionary.
Rather than insert additional logic into specific points in the Nutch lifecycle as I have done with my plugins in previous weeks, this code creates an entire pipeline of 3 Map-Reduce jobs to count and score occurrences of tags in my blog text. It piggybacks on the artifacts created by Nutch during its crawl, so I did not have to build everything from scratch and I learned a bit more about Nutch and Hadoop in the process. Experienced users of Nutch and/or Hadoop would probably find this code horribly amateurish - so if you have suggestions for improvement, I would greatly appreciate your comments.
The problem
I find that the hardest thing about Map-Reduce is defining the problem in terms of the framework, possibly because of my relative unfamiliarity with it. So it probably helps to define the problem first, so here goes.
When you write a blog on Blogger (and I guess on pretty much any other blogging software/service), you are allowed to tag the blog with some keywords that describe the content. Since I don't have a computer for a mind, I generally end up tagging with whatever seems appropriate at the time - as you can see from the Tag Cloud on the sidebar, this approach has proved to be less than optimal.
What I really want is a program that will "know" the tags I have set in the past, scan my new content, and suggest a subset of these tags for tagging this content. If in addition, it could "roll up" the tags into higher level terms, that would be icing on the cake. For example, if one of my tags is "cx_oracle", it could roll up to tags such as "databases", "python", "scripting" and "oracle", because it is very likely (for me at least) that when I talk about "cx_oracle", I am actually talking about all of the others as well.
The Dictionary
I first collected the tags I have created so far by downloading the blog posts using the GData Java API, and parsing out the labels into a flat file, then using some simple Unix commands to get the unique sorted set of blog tags. The code is quite simple, its basically adapted from the examples on the GData site, and its peripheral to this stuff anyway, so I don't show it here.
I then manually enhanced it with "roll-up" tags as I discussed above. Here are some examples from my dictionary.txt file. The idea is that the roll-up tags will be counted each time the base tag is counted, and because a roll-up tag can be associated with multiple base tags, over time, they will end up larger in the Tag Cloud, reflecting the nature of the blog more accurately than a bunch of super-specific tags.
1 2 3 4 5 | ...
classification:ir,algorithms
clustering:ir,algorithms
cx_oracle:databases,python,scripting,oracle
...
|
Algorithm
As mentioned before, this is a 3-stage Map-Reduce job, the output of one is used to feed the input of the next one in the chain. The Mappers and Reducers are described in terms of their inputs and outputs in the table below:
Job-# | Phase | Input | Output |
---|---|---|---|
1 | Map | <url,ParseData> or <url,ParseText> | List(<(url,term),count>) |
Reduce (LongSumReducer) | <(url,term),List(count)> | List(<(url,term),count>) | |
2 | Map | <(url,term),count> | List(<url,(term:count)>) |
Reduce | <url,List(term:count)> | List(<url,CSV(term:count)>) | |
3 | Map | <url,CSV(term:count)> | <digest(url,term),NutchDocument> |
Reduce | List(<digest(url,term),NutchDocument>) | (used to create Lucene index) |
The first stage pulls the page text and page title from the <url,ParseText> and <url,ParseData> maps stored by Nutch during its crawl in the segments/../parse_text and segments/../parse_data directories respectively. Doug Cutting's slides on Hadoop usage in Nutch were immensely helpful in figuring out what to look for. The Mapper analyzes the text of the page or title for matches against the dictionary. If a match occurs in the text agsinst one or more terms on the LHS part of the dictionary entry, then a map entry is written as <(url,term),count> for it. Additionally, if there are corresponding RHS terms for the matched term, each RHS term is scored in the same manner. The output of this Map job is passed to a LongSumReducer, which aggregates the term counts for the (url,term) pair.
Before starting, the Mapper at this stage sets up an internal map of <LHS term,List<RHS term>> from the dictionary file, the path to which is passed into the job from the caller.
The second stage flattens the entries so each URL has one entry with all the aggregated term counts. The Mapper changes the entry from <(url,term),count> to <url,(term:count)>, dropping terms whose counts are less than a specified value. The Reducer takes the collection of (term:count) values for a given URL and strings them up together into a single entry of url to comma-separated string of term:count pairs.
The third stage is responsible for merging information from the Lucene index already created by Nutch in the index directory, and the <url,CSV(term:count)> structure, and creating a new Lucene index out of it. The Mapper looks up the Nutch Lucene index by URL using a two-step map lookup from an internal data structure, and merges it with each (term:count) pair to create multiple NutchDocument objects for each URL. The Reducer takes each of these records and writes it to a new Lucene index into the index2 subdirectory.
The internal data structure used by the Mapper in this stage is a Map of <url,docId> of the Nutch Lucene index. Because the url field is tokenized by Nutch, it was not possible to use the url to look up the record directly, so I dumped out the Nutch Lucene index into memory and create a mapping between url and docId - this approach may not be feasible for large indexes.
Code
Build and Setup Info
I decided to build the indexer within the Nutch codebase, rather than do this externally and have Nutch and Hadoop as dependencies. This was mainly for convenience - I did not want to have to figure out Nutch's build system as well right away. So I created a sub-package called "custom" under Nutch's org.apache.nutch.indexer package and put all my classes in there.
This way I could use Nutch's build.xml to build a new nutch-1.0.jar file that included my code with "ant compile jar".
InvertedIndex.java
The code for the InvertedIndex data structure is shown below. I could also have used Lucene's term API to let Lucene do much of this logic, but this approach means less wrapping/unwrapping.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | // Source: src/java/org/apache/nutch/indexer/custom/InvertedIndex.java
package org.apache.nutch.indexer.custom;
import java.text.BreakIterator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
/**
* A home grown inverted index to count occurrences of multi-word terms
* within a body of text. The underlying data structure is a Map keyed
* by individual words. The value mapped by the key is a Set of positions
* corresponding to the positions of the word (0-based) in the text. For
* multi-word terms, consecutive words are looked up and their positions
* used to find the number of times a multi-word term appears in the text.
*/
public class InvertedIndex {
private Map<String,Set<Integer>> termPositions;
public InvertedIndex(String text) {
termPositions = new HashMap<String,Set<Integer>>();
BreakIterator wordBreakIterator = BreakIterator.getWordInstance(
Locale.getDefault());
wordBreakIterator.setText(text);
int current = 0;
int wordPosition = 0;
for (;;) {
int end = wordBreakIterator.next();
if (end == BreakIterator.DONE) {
break;
}
String nextWord = text.substring(current, end);
current = end;
if (StringUtils.isBlank(nextWord) ||
nextWord.matches("\\p{Punct}")) {
continue;
}
String[] words = getMultiWords(nextWord);
for (String word : words) {
wordPosition = addPosition(word, wordPosition);
}
}
}
public boolean exists(String term) {
return countOccurrencesOf(term) > 0;
}
public int countOccurrencesOf(String term) {
String[] multiwords =
getMultiWords(StringUtils.replace(term, " ", "-"));
Set<Integer> newPrevPositions = new HashSet<Integer>();
Set<Integer> prevPositions = new HashSet<Integer>();
int termId = 0;
for (String word : multiwords) {
termId++;
if (termPositions.containsKey(word)) {
if (termId == 1) {
prevPositions.addAll(termPositions.get(word));
// if this is the only word, we've found it
if (multiwords.length == 1) {
newPrevPositions.addAll(prevPositions);
} else {
continue;
}
} else {
Set<Integer> currentPositions = termPositions.get(word);
for (Integer currentPosition : currentPositions) {
// check for the occurrence of (currentPosition - 1) in
// the prevPositions, if so, copy to the newPrevPositions
if (prevPositions.contains(currentPosition - 1)) {
newPrevPositions.add(currentPosition);
}
}
prevPositions.clear();
prevPositions.addAll(newPrevPositions);
newPrevPositions.clear();
}
} else {
// the current term is not found in our index, invalidating
// the results so far, we should exit at this point
prevPositions.clear();
break;
}
}
return prevPositions.size();
}
private String[] getMultiWords(String term) {
term = StringUtils.lowerCase(term);
if (term.indexOf('-') > -1) {
return StringUtils.split(term, "-");
} else {
return new String[] {term};
}
}
private int addPosition(String word, int position) {
Set<Integer> positions = (termPositions.containsKey(word) ?
termPositions.get(word) : new HashSet<Integer>());
positions.add(position);
termPositions.put(word, positions);
position++;
return position;
}
}
|
Indexer2.java
Here is the code for the indexer. All the Map and Reduce classes are built as private static classes. The main() method calls ToolRunner.run(), which calls index(), which in turn calls the three Map-Reduce jobs in sequence.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 | // Source: src/java/org/apache/nutch/indexer/custom/Indexer2.java
package org.apache.nutch.indexer.custom;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
/**
* Component to build a primitive "semantic" index, using a dictionary
* based on blog tags. Occurrences of a tag are counted and stored in
* the index, along with their counts. Blog articles can then be looked
* up by the tag and ranked using the tag counts.
*
* The input to this index is the parse_data and parse_text information
* in the crawled segments, the dictionary, and the index generated by
* the default Nutch index mechanism.
*
* The format of the dictionary is a flat file, with tags (single or multi
* word) as the key, mapped to zero or more "semantic" meanings of the
* term. For example, when we notice a tag "cx_oracle", it is probably
* not only about cx_oracle, but also about databases, the Oracle database,
* the Python scripting language, and scripting in general. Hence, the
* entry for cx_oracle will look like this:
*
* <pre>
* cx_oracle:databases,oracle,scripting,python
* ...
* </pre>
*
* Tags appearing in the title are given a 5x (currently, configured by
* TITLE_BOOST in the code) boost, ie, each occurrence of a tag in a title
* is counted 5 times, and each occurrence of a tag in the body is counted
* once.
*/
public class Indexer2 extends Configured implements Tool {
private static final Logger LOGGER = Logger.getLogger(Indexer2.class);
private static final int TITLE_BOOST = 5;
private static final int LABEL_CUTOFF_SCORE = 2;
// ==========================
// **** STAGE 1: Analyze ****
// ==========================
/**
* Input: <url,ParseData> or <url,ParseText>
* Output: List(<(url,term),count>)
* Processing: extract text from ParseText and title from ParseData and
* generate a list of terms in the lexicon that are found in these texts.
* Send a List(<url,term>,count) to the reducer. The reducer is a
* (built-in) LongSumReducer, which aggregates counts for each
* <url,term> key.
* Configuration: the location of the dictionary is passed in, which is
* used to build up the lookup table that is used for the processing.
*/
private static class Mapper1 extends MapReduceBase
implements Mapper<WritableComparable<Text>,
Writable,Text,LongWritable> {
private static Map<String,List<String>> DICTIONARY = null;
private static Integer TITLE_BOOST = null;
@Override
public void configure(JobConf job) {
if (DICTIONARY == null || TITLE_BOOST == null) {
TITLE_BOOST = Integer.valueOf(job.get("title.boost"));
DICTIONARY = new HashMap<String,List<String>>();
String dictFile = job.get("index2.dictfile");
try {
FileSystem localFileSystem =
FileSystem.getLocal(new Configuration());
Path dictPath = new Path(dictFile);
FSDataInputStream istream = localFileSystem.open(dictPath);
BufferedReader reader =
new BufferedReader(new InputStreamReader(istream));
String line;
while ((line = reader.readLine()) != null) {
String[] nvp = StringUtils.split(line, ":");
List<String> values = new ArrayList<String>();
if (nvp.length > 1) {
String[] vals = StringUtils.split(nvp[1], ",");
for (String val : vals) {
values.add(val);
}
}
DICTIONARY.put(nvp[0], values);
}
} catch (IOException e) {
LOGGER.error("Could not get DICTIONARY file: " + dictFile, e);
throw new RuntimeException(e);
}
}
}
@Override
public void map(WritableComparable<Text> key, Writable value,
OutputCollector<Text,LongWritable> output, Reporter reporter)
throws IOException {
String url = key.toString();
String text = null;
long occurrenceBoost = 1;
if (value instanceof ParseText) {
ParseText parseText = (ParseText) value;
text = parseText.getText();
} else if (value instanceof ParseData) {
ParseData parseData = (ParseData) value;
text = parseData.getTitle();
occurrenceBoost *= TITLE_BOOST;
}
if (text != null && (! StringUtils.trim(text).equals("null"))) {
InvertedIndex invertedIndex = new InvertedIndex(text);
for (String term : DICTIONARY.keySet()) {
int occurrences = invertedIndex.countOccurrencesOf(term);
if (occurrences > 0) {
String newKey = StringUtils.join(new String[] {url, term}, ",");
LongWritable score =
new LongWritable(occurrences * occurrenceBoost);
output.collect(new Text(newKey), score);
List<String> synonyms = DICTIONARY.get(term);
if (synonyms != null && synonyms.size() > 0) {
for (String synonym : synonyms) {
newKey = StringUtils.join(new String[] {url, synonym}, ",");
output.collect(new Text(newKey), score);
}
}
}
}
}
}
}
// ==========================
// **** STAGE 2: Flatten ****
// ==========================
/**
* Input: <(url,term),count>
* Output: List(<url,(term:count)>
* Processing: each record is re-parsed to be keyed by URL and passed to
* the OutputCollector. Only terms with counts above a preconfigured cutoff
* are collected. This is an attempt to remove label counts which have been
* mentioned "in passing".
*/
private static class Mapper2 extends MapReduceBase
implements Mapper<WritableComparable<Text>,
LongWritable,Text,Text> {
private static Float LABEL_CUTOFF_SCORE = null;
@Override
public void configure(JobConf job) {
if (LABEL_CUTOFF_SCORE == null) {
LABEL_CUTOFF_SCORE = new Float(job.get("label.cutoff.score"));
}
}
@Override
public void map(WritableComparable<Text> key, LongWritable value,
OutputCollector<Text,Text> output,
Reporter reporter) throws IOException {
String[] urlTermPair = StringUtils.split(key.toString(), ",");
long count = value.get();
if (count > LABEL_CUTOFF_SCORE) {
output.collect(new Text(urlTermPair[0]),
new Text(StringUtils.join(new String[] {
urlTermPair[1], String.valueOf(count)}, ":")));
}
}
}
/**
* Input: <url,List(term:count)>
* Output: List(<url,CSV(term:count)>)
* Processing: flattens multiple terms and their associated aggregate
* counts back to the same URL key.
*/
private static class Reducer2 extends MapReduceBase
implements Reducer<WritableComparable<Text>,Text,
WritableComparable<Text>,Text> {
@Override
public void reduce(WritableComparable<Text> key,
Iterator<Text> values,
OutputCollector<WritableComparable<Text>,Text> output,
Reporter reporter) throws IOException {
StringBuilder termCounts = new StringBuilder();
int i = 0;
while (values.hasNext()) {
Text value = values.next();
if (i > 0) {
termCounts.append(",");
}
termCounts.append(value);
i++;
}
output.collect(key, new Text(termCounts.toString()));
}
}
// ==========================
// **** STAGE 3: Merge ****
// ==========================
/**
* Input: <url,CSV(term:count)>
* Output:<digest(url,term),NutchDocument>
* Processing: the url is used to lookup the record in the index built by
* Nutch as part of its normal cycle. A NutchDocument is created for each
* (url,term) combination with this information, and the corresponding term
* and term count. A new unique key is generated for this output record
* using an MD5 hash of the url and term.
*/
private static class Mapper3 extends MapReduceBase
implements Mapper<WritableComparable<Text>,Text,
Text,NutchDocument> {
private static Map<String,Integer> URL_DOCID_MAP = null;
private static IndexReader NUTCH_INDEX_READER = null;
@Override
public void configure(JobConf job) {
try {
if (URL_DOCID_MAP == null) {
URL_DOCID_MAP = new HashMap<String,Integer>();
NUTCH_INDEX_READER =
IndexReader.open(job.get("nutch.index.dir"));
int numDocs = NUTCH_INDEX_READER.maxDoc();
for (int i = 0; i < numDocs; i++) {
Document doc = NUTCH_INDEX_READER.document(i);
String url = doc.get("url");
URL_DOCID_MAP.put(url, i);
}
}
} catch (Exception e) {
LOGGER.error("Cannot open index reader on nutch index for lookup");
throw new RuntimeException(e);
}
}
@Override
public void map(WritableComparable<Text> key, Text value,
OutputCollector<Text,NutchDocument> output, Reporter reporter)
throws IOException {
Integer docId = URL_DOCID_MAP.get(key.toString());
if (docId != null) {
Document doc = NUTCH_INDEX_READER.document(docId);
if (doc != null) {
String termCounts = value.toString();
String[] termCountPairs = StringUtils.split(termCounts, ",");
for (String termCountPair : termCountPairs) {
String[] components = StringUtils.split(termCountPair, ":");
NutchDocument nutchDoc = new NutchDocument();
String label = components[0];
String url = doc.get("url");
nutchDoc.add("label", label);
nutchDoc.add("label_count", components[1]);
nutchDoc.add("url", url);
nutchDoc.add("title", doc.get("title"));
// generate a new unique key based on url and label
String newKey = DigestUtils.md5Hex(
StringUtils.join(new String[] {url, label}, ","));
output.collect(new Text(newKey), nutchDoc);
}
}
}
}
@Override
public void close() {
if (NUTCH_INDEX_READER != null) {
try {
NUTCH_INDEX_READER.close();
} catch (Exception e) {}
}
}
}
/**
* Input: List(<digest(url,term),NutchDocument>)
* Output: none
* Processing: A new Lucene index is created (path to the index passed in).
* For each NutchDocument, a corresponding record is written to the Lucene
* index.
*/
private static class Reducer3 extends MapReduceBase
implements Reducer<WritableComparable<Text>,NutchDocument,
WritableComparable<Text>,NutchDocument> {
private static IndexWriter INDEX2_WRITER = null;
@Override
public void configure(JobConf job) {
if (INDEX2_WRITER == null) {
String indexOutputDir = job.get("index2.output.dir");
try {
INDEX2_WRITER = new IndexWriter(indexOutputDir,
new StandardAnalyzer(), MaxFieldLength.UNLIMITED);
} catch (Exception e) {
LOGGER.error("Could not open index [" + indexOutputDir +
"] for writing");
throw new RuntimeException(e);
}
}
}
@Override
public void reduce(WritableComparable<Text> key,
Iterator<NutchDocument> values,
OutputCollector<WritableComparable<Text>,
NutchDocument> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
NutchDocument nutchDoc = values.next();
Document doc = new Document();
doc.add(new Field("url", nutchDoc.getFieldValue("url"),
Store.YES, Index.NOT_ANALYZED));
doc.add(new Field("label", nutchDoc.getFieldValue("label"),
Store.YES, Index.NOT_ANALYZED));
doc.add(new Field("label_count",
nutchDoc.getFieldValue("label_count"),
Store.YES, Index.NOT_ANALYZED));
doc.add(new Field("title", nutchDoc.getFieldValue("title"),
Store.YES, Index.ANALYZED));
INDEX2_WRITER.addDocument(doc);
}
}
@Override
public void close() {
if (INDEX2_WRITER != null) {
try {
INDEX2_WRITER.optimize();
INDEX2_WRITER.close();
} catch (Exception e) {}
}
}
}
// ==========================
// **** Calling code ****
// ==========================
private void analyze(Path indexDir, String dictFile,
List<Path> segments, int titleBoost) throws IOException {
LOGGER.info("Stage 1 (analyze)");
final JobConf job1 = new NutchJob(getConf());
job1.set("index2.dictfile", dictFile);
job1.set("title.boost", String.valueOf(titleBoost));
job1.setJobName("index2-analyze " + segments);
// inputs for this mapper (parse_data contains the title which we
// want to analyze, and parse_text contains the body of the document,
// so we add paths to both in our mapper
for (Path segment : segments) {
FileInputFormat.addInputPath(
job1, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(
job1, new Path(segment, ParseText.DIR_NAME));
}
FileOutputFormat.setOutputPath(job1, new Path(indexDir, "stage1"));
job1.setMapperClass(Mapper1.class);
job1.setReducerClass(LongSumReducer.class);
job1.setInputFormat(SequenceFileInputFormat.class);
job1.setOutputFormat(SequenceFileOutputFormat.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
JobClient.runJob(job1);
}
private void flatten(Path indexDir, int labelCutoffScore)
throws IOException {
LOGGER.info("Stage 2 (flatten)");
final JobConf job2 = new NutchJob(getConf());
job2.set("label.cutoff.score", String.valueOf(labelCutoffScore));
job2.setJobName("index2-normalize");
FileInputFormat.addInputPath(job2, new Path(indexDir, "stage1"));
FileOutputFormat.setOutputPath(job2, new Path(indexDir, "stage2"));
job2.setMapperClass(Mapper2.class);
job2.setReducerClass(Reducer2.class);
job2.setInputFormat(SequenceFileInputFormat.class);
job2.setOutputFormat(SequenceFileOutputFormat.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
JobClient.runJob(job2);
}
private void merge(Path indexDir, String nutchIndexDir)
throws IOException {
LOGGER.info("Stage 3 (merge)");
final JobConf job3 = new NutchJob(getConf());
job3.set("nutch.index.dir", nutchIndexDir);
job3.set("index2.output.dir",
new Path(indexDir.getParent(), "index2").toString());
job3.setJobName("index2-merge");
FileInputFormat.addInputPath(job3, new Path(indexDir, "stage2"));
FileOutputFormat.setOutputPath(job3, new Path(indexDir, "stage3"));
job3.setMapperClass(Mapper3.class);
job3.setReducerClass(Reducer3.class);
job3.setInputFormat(SequenceFileInputFormat.class);
job3.setOutputFormat(SequenceFileOutputFormat.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(NutchDocument.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(NutchDocument.class);
JobClient.runJob(job3);
}
private void index(Path indexDir, String dictFile,
String nutchIndexDir, List<Path> segments) throws IOException {
LOGGER.info("Starting index2");
analyze(indexDir, dictFile, segments, TITLE_BOOST);
flatten(indexDir, LABEL_CUTOFF_SCORE);
merge(indexDir, nutchIndexDir);
LOGGER.info("Indexer2: done");
}
public int run(String[] args) throws Exception {
if (args.length < 4) {
System.err.println(
"Usage: Indexer2 <index> <dictfile> " +
"<nutch_index_dir> <segment> ...");
return -1;
}
Path indexDir = new Path(args[0]);
String dictFile = args[1];
String nutchIndexDir = args[2];
final List<Path> segments = new ArrayList<Path>();
for (int i = 3; i < args.length; i++) {
segments.add(new Path(args[i]));
}
try {
index(indexDir, dictFile, nutchIndexDir, segments);
} catch (Exception e) {
LOGGER.fatal(e);
return -1;
}
return 0;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(NutchConfiguration.create(),
new Indexer2(), args);
System.exit(result);
}
}
|
Compiling and Running
As I have mentioned in previous posts, I keep my development and runtime Nutch installations separate. The former lives under my home directory, and the latter lives in /opt/nutch-1.0. So I compile my code with "ant compile jar", and copy the resulting build/nutch-1.0.jar file over to /opt/nutch-1.0/lib.
One thing I haven't figured out - the default Nutch installation contained nutch-1.0.jar in the $NUTCH_HOME directory, ie, /opt/nutch-1.0. When I overwrote it with my custom nutch-1.0.jar file, bin/nutch could not find my new Indexer2 class. The CLASSPATH defined in bin/nutch does not include the $NUTCH_HOME/nutch-1.0.jar but does include $NUTCH_HOME/lib/*.jar, so I didn't think about it too much and just copied my custom JAR file into $NUTCH_HOME/lib, which worked fine.
My custom indexer is meant to be run after Nutch is done creating its default index. So after that was done, I ran my custom indexer with the following command:
1 2 3 4 5 6 7 | sujit@sirocco:/opt/nutch-1.0$ export CRAWL_DIR=/home/sujit/tmp
sujit@sirocco:/opt/nutch-1.0$ bin/nutch \
org.apache.nutch.indexer.custom.Indexer2 \
$CRAWL_DIR/data/indexes2 \
$CRAWL_DIR/dictionary.txt \
$CRAWL_DIR/data/index \
$CRAWL_DIR/data/segments/*
|
The logging output is written to $NUTCH_HOME/logs/hadoop.log, which looks like this. No surprises here, it simply reports that it does the three stages and completes.
1 2 3 4 5 | 2009-07-28 17:22:30,806 INFO custom.Indexer2 - Starting index2
2009-07-28 17:22:30,843 INFO custom.Indexer2 - Stage 1 (analyze)
2009-07-28 17:22:49,658 INFO custom.Indexer2 - Stage 2 (flatten)
2009-07-28 17:22:57,996 INFO custom.Indexer2 - Stage 3 (merge)
2009-07-28 17:23:04,921 INFO custom.Indexer2 - Indexer2: done
|
Using the new Index
Unfortunately, testing the index is not just a matter of hitting it with Luke, since I want my results ordered by count. So I wrote this small test program in Jython to search the index with an additional Sort parameter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | #!/usr/bin/env /opt/jython-2.2.1/jython
# Source: src/main/scripts/python/index2_searcher.py
import string
import sys
import traceback
from org.apache.lucene.search import IndexSearcher, Sort, SortField, TermQuery
from org.apache.lucene.index import Term
def usage():
print "Usage: %s path_to_index" % (sys.argv[0])
def runQuery(index, label):
try:
query = TermQuery(Term("label", label))
sort = Sort(SortField("label_count", SortField.INT, True))
hits = index.search(query, sort)
numResults = hits.length()
print "%s search results for label:[%s]" % (str(numResults), label)
for i in range(0, numResults):
doc = hits.doc(i)
title = doc.get("title")
count = doc.get("label_count")
print "%s (%s)" % (title, count)
except Exception:
traceback.print_exc()
def main():
if (len(sys.argv) != 2):
usage()
sys.exit(-1)
index = IndexSearcher(sys.argv[1])
try:
while (True):
label = raw_input("Enter label: ")
if (len(string.rstrip(label)) == 0):
break
runQuery(index, label)
finally:
index.close()
if __name__ == "__main__":
main()
|
Running this as shown below produces results like these. The number in parenthesis at the end of the title is the label_count value.
1 2 3 4 5 6 7 8 9 10 11 12 | Enter label: ror
4 search results for label:[ror]
Salmon Run: Book Reviews with Ruby On Rails (23)
Salmon Run: RoR style URLs with Spring MVC (16)
Salmon Run: First Steps with Pylons (7)
Salmon Run: Django : First Impressions (3)
Enter label: webservice
3 search results for label:[webservice]
Salmon Run: HTTP GET your Web Services here (14)
Salmon Run: SOAP Client for Amazon ECS with XFire (10)
Salmon Run: Spring Remoting Strategies compared (5)
|
Closing Thoughts
The code ran fine on my laptop, which is a non-distributed environment. Haven't run this on a distributed environment so far, so can't tell for sure, but I would probably need to make the following changes for that.
- Wrap the dictionary lookup logic inside a server, and pass in the URL of the service to the first Mapper. That way there is no more dependency on there being a dictionary.txt file on the local file system for all the slave nodes.
- Replace the Reducer in the third stage with an IdentityReducer, then use the output of the third Map-Reduce job to build the index on the master node, i.e. the index writing itself can be outside the Map-Reduce framework.
I should also look again at the contents of dictionary.txt, since it determines the quality of the results. I did the "manual enhancement" one weekend afternoon, and I was just trying to build something so I could feed it to my code, so it probably needs more work before I can think of using it this data as the basis for my Tag Cloud.
Hi. Ive been wondering, Can you actually achieve the same task you want to do in this section of article by using plugin instead?
ReplyDeleteHi Imaizumi, yes, but the logic would have to be split between different plugins, so in this case, the approach of building a new command seemed cleaner to me.
ReplyDelete