Friday, September 18, 2009

Generating a TD Matrix using Hadoop 0.20

So far, I had been using Hadoop 0.18 for my (admittedly rather minimal) Map-Reduce coding. Recently, when trying to find some random feature which I ended up never using, I discovered that it was only available in 0.19 upwards, and when I went to download, Hadoop was already up to 0.20, so I downloaded that instead. Now, Hadoop 0.20 came with some pretty massive API changes, which I did not know about when I downloaded it, and soon the code in my Eclipse IDE was awash in yellow squigglies and strikethroughs.

The yellow squigglies and strikethroughs are, of course, Eclipse telling me that I am looking at/using deprecated method calls. Hadoop 0.20 will support the old API as well, but it feels kind of wrong to write new code against a deprecated API knowing that it will disappear within a few releases. There is not much available by way of example, so I figured that if I could write my next Map-Reduce job using the new API, it would help me figure it out, so I did.

Algorithm

The job was to create a term-document matrix of the terms in about 150+ of my blog posts. I want to use the matrix for some experiments which I will describe later. The matrix is built in three steps as described in the table below:

Job-# Phase Input Output
1 Map <LineNum,Post> List(<(docId,word),count>)
Reduce <(docId,word),List(count)> List(<(docId,word),count>)
2 Map <(docId,word),count> List(<word, count>)
Reduce (LongSumReducer) <word,List(count)> List(<word,count>)
3 Map <(docId,word),count> List(<docId,(word,count)>)
Reduce <docId,List((word,count))> List(<docId,CSV(count)>)

The input to this process is a sequence file of blogs. As always, the key is the line number (a LongWritable). In our case, the value is a MapWritable containing body, keyword and title. The output of the first phase is a list of (docId:word) => occurrence.

The second phase takes the output of the first phase, and just counts the word occurrences across the documents. This is used later to map words to their respective positions in the matrix.

The third stage uses the outputs of the first and second phases. The output of the first phase is converted to a List of docId => dense comma-separated list of positional occurrences. This can now be used as input to a matrix building component.

Code

Here is the code. The main method contains the driver code, and there are three Mapper and three Reducer classes built as public static inner classes. The new API seems to be cleaner than the old one, at least to me, but then I don't have a huge amount of unlearning to do, which may be the case for people who use it more heavily.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
// Source: src/main/java/net/sf/jtmt/indexers/hadoop/TermDocumentMatrixGenerator.java
package net.sf.jtmt.indexers.hadoop;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import net.sf.jtmt.tokenizers.lucene.NumericTokenFilter;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
import org.apache.lucene.analysis.PorterStemFilter;
import org.apache.lucene.analysis.StopFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardFilter;
import org.apache.lucene.analysis.standard.StandardTokenizer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field.TermVector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TermFreqVector;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.RAMDirectory;

/**
 * A 3-stage Hadoop job that reads a sequence file of blog articles,
 * and produces a term document vector of the article collection as
 * its output. The phases are as follows:
 * (1) Analyze each article using a custom Lucene analyzer and use
 *     Lucene's Term API to write out (docId:word) => occurrence.
 * (2) Read output of previous step and write out word => occurrence.
 * (3) Read output of step 2 to create a word to position map. Read
 *     output of step 1 to map docId => (word:occurrence) in the map
 *     phase, and docId => position,... in the reduce phase.
 * @author Sujit Pal
 * @version $Revision: 26 $
 */
public class TermDocumentMatrixGenerator { 

  /** Minimum occurrence for a given word in a document to be counted */
  private static final int MIN_OCCUR_THRESHOLD = 2;
  /** Key for the stop word file */
  private static final String STOPWORD_FILE_LOCATION_KEY = "stf.loc.key";
  /** Key for the terms directory generated in phase 2 */
  private static final String TERMS_DIR_KEY = "terms.dir.key";
  
  /* =============== Phase 1 ============== */
  
  /**
   * Map phase converts an input document into a set of (docId:word) 
   * => occurrence pairs. Stop words are removed, words are stemmed,
   * etc, using a custom analyzer. Words which occur below a threshold
   * are removed from consideration.
   */
  public static class Mapper1 extends 
      Mapper<Text,MapWritable,Text,LongWritable> {
    
    private Analyzer analyzer;
    
    protected void setup(Context context) 
        throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();
      FileSystem hdfs = FileSystem.get(conf);
      FSDataInputStream fis = hdfs.open(
        new Path(conf.get(STOPWORD_FILE_LOCATION_KEY)));
      final Set<String> stopset = new HashSet<String>();
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
      String line = null;
      while ((line = reader.readLine()) != null) {
        stopset.add(line);
      }
      reader.close();
      fis.close();
      analyzer = new Analyzer() {
        public TokenStream tokenStream(String fieldName, Reader reader) {
          return new PorterStemFilter(
            new StopFilter(
              new LowerCaseFilter(
                new NumericTokenFilter(
                  new StandardFilter(
                    new StandardTokenizer(reader)))), stopset));
        }
      };
    }
    
    @Override
    public void map(Text key, MapWritable value, Context context) 
        throws IOException, InterruptedException {
      Text content = (Text) value.get(new Text("content"));
      TermFreqVector contentTf = getTermFrequencyVector(content.toString());
      String[] terms = contentTf.getTerms();
      int[] frequencies = contentTf.getTermFrequencies();
      for (int i = 0; i < terms.length; i++) {
        Text outputKey = new Text(StringUtils.join(
          new String[] {key.toString(), terms[i]}, ":"));
        context.write(outputKey, new LongWritable(frequencies[i]));
      }
    }

    private TermFreqVector getTermFrequencyVector(String content) 
        throws IOException {
      RAMDirectory ramdir = new RAMDirectory();
      IndexWriter writer = new IndexWriter(ramdir, analyzer, 
        MaxFieldLength.UNLIMITED);
      Document doc = new Document();
      doc.add(new Field("text", content.toString(), 
        Store.YES, Index.ANALYZED, TermVector.YES));
      writer.addDocument(doc);
      writer.commit();
      writer.close();
      IndexReader reader = IndexReader.open(ramdir);
      TermFreqVector termFreqVector = reader.getTermFreqVector(0, "text");
      reader.close();
      return termFreqVector;
    }
  }
  
  /**
   * Sums up the occurrences of (docId:word) occurrences and removes
   * those which occur infrequently.
   */
  public static class Reducer1 extends 
      Reducer<Text,LongWritable,Text,LongWritable> {
    
    @Override
    public void reduce(Text key, Iterable<LongWritable> values, 
        Context context) throws IOException, InterruptedException {
      long sum = 0L;
      for (LongWritable value : values) {
        sum += value.get();
      }
      if (sum > MIN_OCCUR_THRESHOLD) {
        context.write(key, new LongWritable(sum));
      }
    }
  }

  /* =============== Phase 2 ============== */

  /**
   * Extract the word from the key of the previous output, and emit
   * them. This will be used to build a word to position map.
   */
  public static class Mapper2 extends 
      Mapper<Text,LongWritable,Text,LongWritable> {
    
    private static final LongWritable ONE = new LongWritable(1);
    
    @Override
    public void map(Text key, LongWritable value, Context context) 
        throws IOException, InterruptedException {
      String word = StringUtils.split(key.toString(), ":")[1];
      context.write(new Text(word), ONE);
    }
  }
  
  /**
   * Aggregates the word count. 
   */
  public static class Reducer2 extends
      Reducer<Text,LongWritable,Text,LongWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, 
        Context context) throws IOException, InterruptedException {
      long occurrences = 0;
      for (LongWritable value : values) {
        occurrences++;
      }
      context.write(key, new LongWritable(occurrences));
    }
  }
  
  /* =============== Phase 3 ============== */

  /**
   * Reads the output of step 1 and emits docId => (word:occurrence)
   * pairs.
   */
  public static class Mapper3 extends Mapper<Text,LongWritable,Text,Text> {

    @Override
    public void map(Text key, LongWritable value, Context context) 
        throws IOException, InterruptedException {
      String[] pair = StringUtils.split(key.toString(), ":");
      context.write(new Text(pair[0]), 
        new Text(StringUtils.join(
        new String[] {pair[1], String.valueOf(value.get())}, ":")));
    }
  }

  /**
   * Converts the output of step 2 into a map of word and position.
   * Flattens the docId => (word:occurrence) pairs to docId => dense
   * positional list of occurrences. 
   */
  public static class Reducer3 extends Reducer<Text,Text,Text,Text> {

    private Map<String,Integer> terms = new HashMap<String,Integer>();
    
    @Override
    public void setup(Context context) 
        throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();
      FileSystem hdfs = FileSystem.get(conf);
      FileStatus[] partFiles = hdfs.listStatus(
        new Path(conf.get(TERMS_DIR_KEY)));
      for (FileStatus partFile : partFiles) {
        if (! partFile.getPath().getName().startsWith("part-r")) {
          continue;
        }
        FSDataInputStream fis = hdfs.open(partFile.getPath());
        BufferedReader reader = new BufferedReader(
          new InputStreamReader(fis));
        String line = null;
        int i = 0;
        while ((line = reader.readLine()) != null) {
          String term = StringUtils.split(line, "\t")[0];
          terms.put(term, i);
          i++;
        }
        reader.close();
        fis.close();
      }
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      SortedMap<String,Long> frequencies = new TreeMap<String,Long>();
      for (Text value : values) {
        String[] parts = StringUtils.split(value.toString(), ":");
        String word = parts[0];
        Long occurrence = new Long(parts[1]);
        frequencies.put(word, occurrence);
      }
      context.write(key, flatten(frequencies));
    }

    private Text flatten(SortedMap<String,Long> frequencies) {
      long[] array = new long[terms.size()];
      for (String word : frequencies.keySet()) {
        int pos = terms.get(word);
        array[pos] = frequencies.get(word);
      }
      StringBuilder buf = new StringBuilder();
      for (int i = 0; i < array.length; i++) {
        if (i > 0) {
          buf.append(",");
        }
        buf.append(String.valueOf(array[i]));
      }
      return new Text(buf.toString());
    }
  }
    
  /**
   * Calling method.
   * @param argv command line args.
   * @throws Exception if thrown.
   */
  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(
      conf, argv).getRemainingArgs();
    if (otherArgs.length != 4) {
      System.err.println("Usage: tdmg <prefix> <stopwords> <indir> <outdir>");
      System.exit(-1);
    }
    Path basedir = new Path(otherArgs[0] + otherArgs[2]).getParent();
    /* phase 1: convert doc file to (docId:word) => occurrence */
    Job job1 = new Job(conf, "phase-1");
    job1.getConfiguration().set(STOPWORD_FILE_LOCATION_KEY, 
      otherArgs[0] + otherArgs[1]);
    job1.getConfiguration().set("foo", "bar");
    FileInputFormat.addInputPath(job1, 
      new Path(otherArgs[0] + otherArgs[2]));
    FileOutputFormat.setOutputPath(job1, new Path(basedir, "temp1"));
    job1.setJarByClass(TermDocumentMatrixGenerator.class);
    job1.setMapperClass(Mapper1.class);
    job1.setReducerClass(Reducer1.class);
    job1.setInputFormatClass(SequenceFileInputFormat.class);
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    job1.setMapOutputKeyClass(Text.class);
    job1.setMapOutputValueClass(LongWritable.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(LongWritable.class);
    job1.setNumReduceTasks(2);
    boolean job1Success = job1.waitForCompletion(true);
    if (! job1Success) {
      System.err.println("Job1 failed, exiting");
      System.exit(-1);
    }
    /* phase 2: write out unique terms with indexes to terms directory */
    Job job2 = new Job(conf, "phase-2");
    FileInputFormat.addInputPath(job2, new Path(basedir, "temp1"));
    FileOutputFormat.setOutputPath(job2, new Path(basedir, "temp2"));
    job2.setJarByClass(TermDocumentMatrixGenerator.class);
    job2.setMapperClass(Mapper2.class);
    job2.setReducerClass(Reducer2.class);
    job2.setInputFormatClass(SequenceFileInputFormat.class);
    job2.setOutputFormatClass(TextOutputFormat.class);
    job2.setMapOutputKeyClass(Text.class);
    job2.setMapOutputValueClass(LongWritable.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(LongWritable.class);
    job2.setNumReduceTasks(2);
    boolean job2Success = job2.waitForCompletion(true);
    if (! job2Success) {
      System.err.println("Job2 failed, exiting");
      System.exit(-1);
    }
    /* phase 3: flatten to docId => occurrence,... */
    Job job3 = new Job(conf, "phase-3");
    job3.getConfiguration().set(TERMS_DIR_KEY, basedir + "/temp2"); 
    FileInputFormat.addInputPath(job3, new Path(basedir, "temp1"));
    FileOutputFormat.setOutputPath(job3, 
      new Path(otherArgs[0] + otherArgs[3]));
    job3.setJarByClass(TermDocumentMatrixGenerator.class);
    job3.setMapperClass(Mapper3.class);
    job3.setReducerClass(Reducer3.class);
    job3.setInputFormatClass(SequenceFileInputFormat.class);
    job3.setOutputFormatClass(TextOutputFormat.class);
    job3.setMapOutputKeyClass(Text.class);
    job3.setMapOutputValueClass(Text.class);
    job3.setOutputKeyClass(Text.class);
    job3.setOutputValueClass(Text.class);
    job3.setNumReduceTasks(2);
    System.exit(job3.waitForCompletion(true) ? 0 : 1);
  }
}

I ran this in local mode and pseudo-distributed mode. I had to change the code a bit for the pseudo-distributed mode (I think mostly its the file names), but here is the script I used to run it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
# Source: ./src/main/scripts/hadoop/run_tdmg.sh
# Script to call Term Document Matrix Generator

## CONFIG ##
M2_REPO=/home/sujit/.m2/repository
HADOOP_HOME=/opt/hadoop-0.20.0
PROJECT_BASEDIR=/home/sujit/src/jtmt
MODE=p # mode can be (l)ocal or (p)seudo-distributed
if [ $MODE == "l" ]; then
  PROTOCOL_PREFIX=""
  STOPWORD_FILE=$PROJECT_BASEDIR/src/main/resources/stopwords.txt
  INPUT_DIR=$PROJECT_BASEDIR/src/test/resources/hac/inputs
  OUTPUT_DIR=$PROJECT_BASEDIR/src/test/resources/hac/outputs
  TEMP_DIRS=$PROJECT_BASEDIR/src/test/resources/hac/temp*
else
  PROTOCOL_PREFIX=hdfs://localhost:54310
  STOPWORD_FILE=/user/sujit/tdmg/resources/stopwords.txt
  INPUT_DIR=/user/sujit/tdmg/inputs
  OUTPUT_DIR=/user/sujit/tdmg/outputs
  TEMP_DIRS=/user/sujit/tdmg/temp*
fi
## CONFIG ##

# for local mode
if [ $MODE == "l" ]; then
  export HADOOP_CLASSPATH=$CLASSPATH:\
$M2_REPO/org/apache/lucene/lucene-core/2.4.0/lucene-core-2.4.0.jar:\
$M2_REPO/org/apache/lucene-analyzers/2.3.0/lucene-analyzers-2.3.0.jar:\
$M2_REPO/commons-lang/commons-lang/2.1/commons-lang-2.1.jar:\
$PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar
fi

cd $HADOOP_HOME
if [ $MODE == "l" ]; then
  rm -rf $OUTPUT_DIR
  rm -rf $TEMP_DIRS
  # no special packaging required for local mode
  bin/hadoop jar $PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator "" $STOPWORD_FILE $INPUT_DIR $OUTPUT_DIR
else
  bin/hadoop fs -rmr $OUTPUT_DIR
  bin/hadoop fs -rmr $TEMP_DIRS
  bin/hadoop jar $PROJECT_BASEDIR/target/tdm-generator.jar $PROTOCOL_PREFIX $STOPWORD_FILE $INPUT_DIR $OUTPUT_DIR
fi
cd -
unset HADOOP_CLASSPATH

Similarities and Differences

The main difference of course is that you extend mapreduce.Mapper and mapreduce.Reducer instead of extending MapReduceBase and implementing either mapred.Mapper or mapred.Reducer. The Context provides a more unified way to pass stuff around, although I had some trouble (which I later fixed) passing parameters to the Mapper and Reducer classes. Another difference seems to be that the ToolRunner is no longer needed.

I did miss the "convenience" Mappers and Reducers (such as LongSumReducer), but I am guessing that they will make their way back in once the old API goes away.

Among the similarities, you still have to package your application code and third party dependencies in the same way as before, something I was hoping would go away once I learned about HADOOP_CLASSPATH, but I guess it makes sense to keep it the way it is.

37 comments (moderated to prevent spam):

Pintoo said...

i am unable to find lucene.NumericTokenFilter
please help me out

Sujit Pal said...

Hi Pintoo, this is part of my JTMT project. You can find the source for this filter here.

Unknown said...

Hi. Salmon, would try to run the Reuters-21578 Test Collection ; http://www.daviddlewis.com/resources/testcollections/reuters21578/

Sincerely yours.

C.C. Yang Data miner ; t9599010@yahoo.com.tw

Sujit Pal said...

Thanks for the link, Chih-Chin, and best of luck.

Anonymous said...

Dear Sujit: would you please tell me how to use your code in Hadoop? thanks, CC Yang

Sujit Pal said...

Isn't this post sufficient? I have the code and the shell scripts here, maybe I missed something? In any case, let me know what exactly you are looking for and I can try to help.

Anonymous said...

Thank Sujit:
We already make your code, after retrieve some earlier functions, But only can run on the Random values. Like to know the how powerful usage your code can work on/to. So, if you could show the resulting form or table...; it help us to explore it.

Chih-Chin Yang (Taiwan)
* i am a teacher, try to introduce exemplar applications in hadoop.

Sujit Pal said...

Oh, you mean how I used the data I generated? Actually, now that I think about it, I had planned to use it for larger (in the data sense) experiments on similarity and clustering (based on the comparatively tiny ones I did and described before), but never got around to it (got interested in something else :-)).

Anonymous said...

So great! But I want to see the photo of the TD Matrix. Can you take a photograph of the TD Matrix result?
Thank you very much!!!

Sujit Pal said...

Hi, each line in the output of the third job would be in this form

{docId: [term1_count, term2_count, ....]}

The same job also creates the terms file, something like this:

{term: termId}

so you can relate the two outputs into a term-document matrix of the form:

[[term1_count term2_count ...], // document_1 row
[term1_count term2_count ...], // document_2 row
...]

Hope this makes sense?

Unknown said...

hii sujit,
Actually I need help I am giving seminar on implementation of hierarchical clustering and I want to implement this code.I already installed hadoop-1.0.4 on single node.So please tell me is this code will work on hadoop-1.0.4.If yes than please give me some steps how to proceed with this code.plzz reply me as soon as possible..

Sujit Pal said...

Hi Ankur, /this/ code should be fine, it worked for me, but please dont attempt the hierarchical clustering implementation I posted in another post - that is very slow and is just not worth doing it that way.

Unknown said...

ok...thats fine but can you plzz tell me the steps which I should follow to implement agglomerative hierarchical clustering on single node..I just want to show how to run this code to my guide...

Sujit Pal said...

I don't really know. I stopped looking at HAC shortly after I wrote the poorly performing code, my aim is to classify, so any algorithm works as long as it performs within reasonable bounds.

Unknown said...

k..than any other option or resource from where I get some help..any website or material..?

Sujit Pal said...

You may want to check out Mining of Massive Datasets by Rajaraman, Leskovec and Ullman have a chapter on Clustering in which there is a section on Hierarchical clustering (free PDF available). Since MOMD is all map reduce based this may be helpful for you. The order of the naive HAC that I have is O(n^3) but they describe an approach that is slightly better at O(n^2 log n).

Unknown said...

Hi Sujit,

I am a student ,where i just started to learn map reduce programming.

i am trying to do Text Rank alogrthim for a single document.

Till now i have done inverted index of a document.

After that i belive that i need to do document matrix.

Not getting an idea how to proceed further

can you please help me to have a alogorthim for the Textrank in Mapreduce .

Thanks,
Priya


Sujit Pal said...

Hi Krishna Priya, I am assuming that by inverted index you mean something like a Lucene index? And your objective is to create a taxonomy (weighted graph) of words from a corpus? If so, one way could be to get all bigrams from the text, then use the bigrams (t1, t2) to search for document pairs (d1, d2) with these terms. The number of incoming links into (t2) would be the text rank of the term t2. You may want to filter the terms you want to use, perhaps remove stopwords and/or non-nouns. To get the bigram pairs, you could use MR, and perhaps lookup a Solr instance to get the corresponding (t2,d2) pairs and reduce over all d2 to get the text rank for terms t2. Just thinking out loud, so hopefully this should work, please don't blame me if this doesn't :-).

Unknown said...

Hi Sujit,
Thanks for the reply.

Can you give me a sample code how to write customized record reader to read a complete sentence i.e until period.

Sujit Pal said...

You're welcome. Rather than a custom RecordReader, it may be preferable to go a slightly different route, given that HDFS is optimized to work with few large files, and a corpus is typically a collection of many small files. If you wrap the file system with an HTTP server, and then initiate your Job with a TextFile of URLs for the documents, then your mapper could tokenize the incoming sentence into sentences and emit (filename, sentence) tuples. If you went that route, then you could use something as simple as Java's BreakIterator (example code is available in JavaDocs) or a more model based approach using OpenNLP (here is some example code that includes sentence tokenization)., or some other sentence tokenization approach that you may have a preference for.

Anonymous said...

when i run the script in eclipse europa 3.3.2 and hadoop 0.19.1 version, i get the following output.
"Usage: tdmg "
my input is text file containing reviews.
how to give input file??

Sujit Pal said...

I think blogger is eating up the rest of your error message because of the angle brackets - its basically the message in line 295 of the TermDocumentMatrixGenerator code. Your inputs will depend on whether you are running this in local mode (lines 11-15, 39) or hdfs mode (lines 17-21, 43) in the run_tdmg.sh block.

Anonymous said...

Please suggest which version to be used for lucene core and lucene analyzer. You have mentioned different version in your source lib folder and script file which you used to run the code.
In addition we get the following error.

14/01/08 15:30:48 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
14/01/08 15:30:49 INFO input.FileInputFormat: Total input paths to process : 5
14/01/08 15:30:51 INFO mapred.JobClient: Running job: job_201401081158_0022
14/01/08 15:30:52 INFO mapred.JobClient: map 0% reduce 0%
14/01/08 15:31:03 INFO mapred.JobClient: Task Id : attempt_201401081158_0022_m_000000_0, Status : FAILED
java.lang.NoClassDefFoundError: org/apache/lucene/analysis/Analyzer
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:761)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:806)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:157)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:467)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException: org.apache.lucene.analysis.Analyzer
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
... 8 more

14/01/08 15:31:03 INFO mapred.JobClient: Task Id : attempt_201401081158_0022_m_000001_0, Status : FAILED
java.lang.NoClassDefFoundError: org/apache/lucene/analysis/Analyzer
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:761)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:806)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:157)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:467)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException: org.apache.lucene.analysis.Analyzer
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
... 8 more

Thankz in advance sir :)

Sujit Pal said...

Yes, sorry, this was because I kept adding stuff to the project as the Lucene versions changed. The script should be changed to point to lucene-core-3.0.2 and lucene-analyzers-3.0.2 respectively (the code compiles against it so it should also run). One reason for the CNFE you are seeing may be that you don't have M2_REPO environment variable defined or your local ~/.m2/repository does not have the lucene jars named (Analyzer is in lucene-core BTW). So you should change the paths in lines 28-31 of run_tdmg.sh to point to the jar in lib instead of M2_REPO. I did attempt to update the repository, but it won't let me in (haven't updated sourceforge in years, don't blame them :-)). I have asked for a password reset, still waiting, once I get access, I will update it.

Banu said...

THANK YOU SIR.....
I dont have .m2 repo directory. so i removed the line 6 and changed the lines 28-30 as

$HADOOP_HOME/lib/lucene-core-2.4.0.jar:\

how to generate jtmt-1.0-SNAPSHOT.jar??
WHERE to use your script ?? whether cygwin terminal or eclispe??
how to run in pseudo distributed mode??
I also need code to run in pseudo-dist mode******

Sujit Pal said...

You are welcome, although I would suggest updating to lucene-core and lucene-analyzers 3.0.2 (the latest available in the application). To generate the jar file you can run "ant jar" from command line at the project directory. You should run the script from the project directory also (since your jars are in lib/*). I prefer running everything from the command line, but I am sure there are other ways, choose what you like. To set up pseudo distributed mode, check out Michael Noll's tutorial - this is what I learned from, and I think its still valid for current versions of Hadoop. The same code and script can run for pseudo-distributed mode also (thats why there is an if condition).

Banu said...

Thank you sir. we generated snapshot in eclipse itself..and it finds jar now.
But sir, i am getting the following exception when i run this program..
14/01/12 14:44:43 INFO input.FileInputFormat: Total input paths to process : 1
14/01/12 14:44:45 INFO mapred.JobClient: Running job: job_201401121442_0002
14/01/12 14:44:46 INFO mapred.JobClient: map 0% reduce 0%
14/01/12 14:45:02 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_0, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)

14/01/12 14:45:08 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_1, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)

14/01/12 14:45:14 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_2, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)

14/01/12 14:45:23 INFO mapred.JobClient: Job complete: job_201401121442_0002
14/01/12 14:45:23 INFO mapred.JobClient: Counters: 3
14/01/12 14:45:23 INFO mapred.JobClient: Job Counters
14/01/12 14:45:23 INFO mapred.JobClient: Launched map tasks=4
14/01/12 14:45:23 INFO mapred.JobClient: Data-local map tasks=4
14/01/12 14:45:23 INFO mapred.JobClient: Failed map tasks=1
Job1 failed, exiting

Is it wrong with my INPUT FORMAT??
i have extracted reviews in text format and converted to sequence file using a program.
Can u give a sample sequence file to run it??

Thanks in advance..:-)

Sujit Pal said...

Hi Banu, looks like your input file is in [Long,Text] format (line:content) and the code expects to see [Text,Map] format. There is a BlogSequenceFileWriter class in the same package which was presumably used to build the input sequence file. It may be a good idea to try to adapt it to your needs. In fact, given that its been a while since I wrote the code and I don't remember any of it, looking at the available code and trying to debug it locally is generally likely to be more fruitful than asking me :-).

Banu said...

Sir, thank you so much..
We got the output sir...
:-):-)
Thank you for your help sir
:-):-):-):-):-)

Sujit Pal said...

Cool! and you're welcome.

Banu said...

Sir!!!!!!
We have following issues. Please do help us :( :( :(

*The third job creates two files, but one file is empty.
i.e.,it doesn't create the terms file, something like this:

{term: termId}
What to do??

*The first file of 3rd job creates this form of file:
{docId: [term1_count, term2_count, ....]}
but it contains so many 0's at last. Why??

*how to relate the two outputs into a term-document matrix of the form as below??

[[term1_count term2_count ...], // document_1 row
[term1_count term2_count ...], // document_2 row
...]
Any code sir???

*We have two seq files in input directory, but the output of all three phases are generated for only one document i.e., docId[1].

Sujit Pal said...

Going by the description, the empty file should be a dictionary of {term:index}, its created in Reducer3.setup. Not sure of the reason it is empty - one guess is that maybe I never checked and it has always been empty. This is because the actual output of Reducer3 is {docId: [{term: count},...]} and not {docId: [{index: count},...]}, so this may just be dead code that I forgot to remove. The output of Reducer3 is the TD matrix, each value corresponds to the sparse term vector for that docId. I don't think the vector should contain 0 counts as we explicitly remove them earlier, and I am not sure why its returning results for a single sequence file.

Anonymous said...

I need a input txt file for blogSequenceWriter code. The txt files (/resources/hac/inputs) and (/resources/data/blog) are not available. Please provide me soon sir. :( :(

Banu said...

In the lines 195-196, we get a warning:the local variable "value" is never used.

for (LongWritable value : values) {
occurrences++;
}

Will this cause any problem sir??
In the terms file , we get term:docId instead of term:termId. What is the problem??
Thanks in advance:)

Sujit Pal said...

@Anonymous: this is the collection of my blog posts from around 2009. I no longer have the data handy, you can try using some other data such as wikipedia dump or something similar.

@Banu: no this warning will not cause a problem. Regarding why term:docid and not term:termid, I have no idea, but the code should tell you...

Unknown said...

Hi Sujit, Kindly help me out of this error.Thanks in advance.

14/10/11 20:08:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/10/11 20:08:47 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/10/11 20:08:47 INFO input.FileInputFormat: Total input paths to process : 10
14/10/11 20:08:47 INFO mapred.JobClient: Running job: job_local158039148_0001
14/10/11 20:08:47 INFO mapred.LocalJobRunner: Waiting for map tasks
14/10/11 20:08:47 INFO mapred.LocalJobRunner: Starting task: attempt_local158039148_0001_m_000000_0
14/10/11 20:08:47 INFO util.ProcessTree: setsid exited with exit code 0
14/10/11 20:08:47 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@534153
14/10/11 20:08:47 INFO mapred.MapTask: Processing split: file:/home/muthu/workspace_2/td matrix/sample/7:0+26479
14/10/11 20:08:47 INFO mapred.MapTask: io.sort.mb = 100
14/10/11 20:08:47 INFO mapred.MapTask: data buffer = 79691776/99614720
14/10/11 20:08:47 INFO mapred.MapTask: record buffer = 262144/327680
14/10/11 20:08:47 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader@129c5d3
java.lang.NullPointerException
at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close(SequenceFileRecordReader.java:101)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.close(MapTask.java:496)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1776)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:778)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Sujit Pal said...

Hi Muthuraman, the error does not indicate much except there is a NPE thrown somewhere (very likely in your code). I think perhaps the console stack trace that you provided has more of the stack trace? If so, you will find a place where your code is mentioned and that will point to where the NPE is happening.