So far, I had been using Hadoop 0.18 for my (admittedly rather minimal) Map-Reduce coding. Recently, when trying to find some random feature which I ended up never using, I discovered that it was only available in 0.19 upwards, and when I went to download, Hadoop was already up to 0.20, so I downloaded that instead. Now, Hadoop 0.20 came with some pretty massive API changes, which I did not know about when I downloaded it, and soon the code in my Eclipse IDE was awash in yellow squigglies and strikethroughs.
The yellow squigglies and strikethroughs are, of course, Eclipse telling me that I am looking at/using deprecated method calls. Hadoop 0.20 will support the old API as well, but it feels kind of wrong to write new code against a deprecated API knowing that it will disappear within a few releases. There is not much available by way of example, so I figured that if I could write my next Map-Reduce job using the new API, it would help me figure it out, so I did.
Algorithm
The job was to create a term-document matrix of the terms in about 150+ of my blog posts. I want to use the matrix for some experiments which I will describe later. The matrix is built in three steps as described in the table below:
Job-# | Phase | Input | Output |
1 | Map | <LineNum,Post> | List(<(docId,word),count>) |
Reduce | <(docId,word),List(count)> | List(<(docId,word),count>) | |
2 | Map | <(docId,word),count> | List(<word, count>) |
Reduce (LongSumReducer) | <word,List(count)> | List(<word,count>) | |
3 | Map | <(docId,word),count> | List(<docId,(word,count)>) |
Reduce | <docId,List((word,count))> | List(<docId,CSV(count)>) |
The input to this process is a sequence file of blogs. As always, the key is the line number (a LongWritable). In our case, the value is a MapWritable containing body, keyword and title. The output of the first phase is a list of (docId:word) => occurrence.
The second phase takes the output of the first phase, and just counts the word occurrences across the documents. This is used later to map words to their respective positions in the matrix.
The third stage uses the outputs of the first and second phases. The output of the first phase is converted to a List of docId => dense comma-separated list of positional occurrences. This can now be used as input to a matrix building component.
Code
Here is the code. The main method contains the driver code, and there are three Mapper and three Reducer classes built as public static inner classes. The new API seems to be cleaner than the old one, at least to me, but then I don't have a huge amount of unlearning to do, which may be the case for people who use it more heavily.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | // Source: src/main/java/net/sf/jtmt/indexers/hadoop/TermDocumentMatrixGenerator.java
package net.sf.jtmt.indexers.hadoop;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import net.sf.jtmt.tokenizers.lucene.NumericTokenFilter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
import org.apache.lucene.analysis.PorterStemFilter;
import org.apache.lucene.analysis.StopFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardFilter;
import org.apache.lucene.analysis.standard.StandardTokenizer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field.TermVector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TermFreqVector;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.RAMDirectory;
/**
* A 3-stage Hadoop job that reads a sequence file of blog articles,
* and produces a term document vector of the article collection as
* its output. The phases are as follows:
* (1) Analyze each article using a custom Lucene analyzer and use
* Lucene's Term API to write out (docId:word) => occurrence.
* (2) Read output of previous step and write out word => occurrence.
* (3) Read output of step 2 to create a word to position map. Read
* output of step 1 to map docId => (word:occurrence) in the map
* phase, and docId => position,... in the reduce phase.
* @author Sujit Pal
* @version $Revision: 26 $
*/
public class TermDocumentMatrixGenerator {
/** Minimum occurrence for a given word in a document to be counted */
private static final int MIN_OCCUR_THRESHOLD = 2;
/** Key for the stop word file */
private static final String STOPWORD_FILE_LOCATION_KEY = "stf.loc.key";
/** Key for the terms directory generated in phase 2 */
private static final String TERMS_DIR_KEY = "terms.dir.key";
/* =============== Phase 1 ============== */
/**
* Map phase converts an input document into a set of (docId:word)
* => occurrence pairs. Stop words are removed, words are stemmed,
* etc, using a custom analyzer. Words which occur below a threshold
* are removed from consideration.
*/
public static class Mapper1 extends
Mapper<Text,MapWritable,Text,LongWritable> {
private Analyzer analyzer;
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
FileSystem hdfs = FileSystem.get(conf);
FSDataInputStream fis = hdfs.open(
new Path(conf.get(STOPWORD_FILE_LOCATION_KEY)));
final Set<String> stopset = new HashSet<String>();
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = reader.readLine()) != null) {
stopset.add(line);
}
reader.close();
fis.close();
analyzer = new Analyzer() {
public TokenStream tokenStream(String fieldName, Reader reader) {
return new PorterStemFilter(
new StopFilter(
new LowerCaseFilter(
new NumericTokenFilter(
new StandardFilter(
new StandardTokenizer(reader)))), stopset));
}
};
}
@Override
public void map(Text key, MapWritable value, Context context)
throws IOException, InterruptedException {
Text content = (Text) value.get(new Text("content"));
TermFreqVector contentTf = getTermFrequencyVector(content.toString());
String[] terms = contentTf.getTerms();
int[] frequencies = contentTf.getTermFrequencies();
for (int i = 0; i < terms.length; i++) {
Text outputKey = new Text(StringUtils.join(
new String[] {key.toString(), terms[i]}, ":"));
context.write(outputKey, new LongWritable(frequencies[i]));
}
}
private TermFreqVector getTermFrequencyVector(String content)
throws IOException {
RAMDirectory ramdir = new RAMDirectory();
IndexWriter writer = new IndexWriter(ramdir, analyzer,
MaxFieldLength.UNLIMITED);
Document doc = new Document();
doc.add(new Field("text", content.toString(),
Store.YES, Index.ANALYZED, TermVector.YES));
writer.addDocument(doc);
writer.commit();
writer.close();
IndexReader reader = IndexReader.open(ramdir);
TermFreqVector termFreqVector = reader.getTermFreqVector(0, "text");
reader.close();
return termFreqVector;
}
}
/**
* Sums up the occurrences of (docId:word) occurrences and removes
* those which occur infrequently.
*/
public static class Reducer1 extends
Reducer<Text,LongWritable,Text,LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
if (sum > MIN_OCCUR_THRESHOLD) {
context.write(key, new LongWritable(sum));
}
}
}
/* =============== Phase 2 ============== */
/**
* Extract the word from the key of the previous output, and emit
* them. This will be used to build a word to position map.
*/
public static class Mapper2 extends
Mapper<Text,LongWritable,Text,LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
@Override
public void map(Text key, LongWritable value, Context context)
throws IOException, InterruptedException {
String word = StringUtils.split(key.toString(), ":")[1];
context.write(new Text(word), ONE);
}
}
/**
* Aggregates the word count.
*/
public static class Reducer2 extends
Reducer<Text,LongWritable,Text,LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long occurrences = 0;
for (LongWritable value : values) {
occurrences++;
}
context.write(key, new LongWritable(occurrences));
}
}
/* =============== Phase 3 ============== */
/**
* Reads the output of step 1 and emits docId => (word:occurrence)
* pairs.
*/
public static class Mapper3 extends Mapper<Text,LongWritable,Text,Text> {
@Override
public void map(Text key, LongWritable value, Context context)
throws IOException, InterruptedException {
String[] pair = StringUtils.split(key.toString(), ":");
context.write(new Text(pair[0]),
new Text(StringUtils.join(
new String[] {pair[1], String.valueOf(value.get())}, ":")));
}
}
/**
* Converts the output of step 2 into a map of word and position.
* Flattens the docId => (word:occurrence) pairs to docId => dense
* positional list of occurrences.
*/
public static class Reducer3 extends Reducer<Text,Text,Text,Text> {
private Map<String,Integer> terms = new HashMap<String,Integer>();
@Override
public void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
FileSystem hdfs = FileSystem.get(conf);
FileStatus[] partFiles = hdfs.listStatus(
new Path(conf.get(TERMS_DIR_KEY)));
for (FileStatus partFile : partFiles) {
if (! partFile.getPath().getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = hdfs.open(partFile.getPath());
BufferedReader reader = new BufferedReader(
new InputStreamReader(fis));
String line = null;
int i = 0;
while ((line = reader.readLine()) != null) {
String term = StringUtils.split(line, "\t")[0];
terms.put(term, i);
i++;
}
reader.close();
fis.close();
}
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
SortedMap<String,Long> frequencies = new TreeMap<String,Long>();
for (Text value : values) {
String[] parts = StringUtils.split(value.toString(), ":");
String word = parts[0];
Long occurrence = new Long(parts[1]);
frequencies.put(word, occurrence);
}
context.write(key, flatten(frequencies));
}
private Text flatten(SortedMap<String,Long> frequencies) {
long[] array = new long[terms.size()];
for (String word : frequencies.keySet()) {
int pos = terms.get(word);
array[pos] = frequencies.get(word);
}
StringBuilder buf = new StringBuilder();
for (int i = 0; i < array.length; i++) {
if (i > 0) {
buf.append(",");
}
buf.append(String.valueOf(array[i]));
}
return new Text(buf.toString());
}
}
/**
* Calling method.
* @param argv command line args.
* @throws Exception if thrown.
*/
public static void main(String[] argv) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(
conf, argv).getRemainingArgs();
if (otherArgs.length != 4) {
System.err.println("Usage: tdmg <prefix> <stopwords> <indir> <outdir>");
System.exit(-1);
}
Path basedir = new Path(otherArgs[0] + otherArgs[2]).getParent();
/* phase 1: convert doc file to (docId:word) => occurrence */
Job job1 = new Job(conf, "phase-1");
job1.getConfiguration().set(STOPWORD_FILE_LOCATION_KEY,
otherArgs[0] + otherArgs[1]);
job1.getConfiguration().set("foo", "bar");
FileInputFormat.addInputPath(job1,
new Path(otherArgs[0] + otherArgs[2]));
FileOutputFormat.setOutputPath(job1, new Path(basedir, "temp1"));
job1.setJarByClass(TermDocumentMatrixGenerator.class);
job1.setMapperClass(Mapper1.class);
job1.setReducerClass(Reducer1.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
job1.setNumReduceTasks(2);
boolean job1Success = job1.waitForCompletion(true);
if (! job1Success) {
System.err.println("Job1 failed, exiting");
System.exit(-1);
}
/* phase 2: write out unique terms with indexes to terms directory */
Job job2 = new Job(conf, "phase-2");
FileInputFormat.addInputPath(job2, new Path(basedir, "temp1"));
FileOutputFormat.setOutputPath(job2, new Path(basedir, "temp2"));
job2.setJarByClass(TermDocumentMatrixGenerator.class);
job2.setMapperClass(Mapper2.class);
job2.setReducerClass(Reducer2.class);
job2.setInputFormatClass(SequenceFileInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(LongWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(LongWritable.class);
job2.setNumReduceTasks(2);
boolean job2Success = job2.waitForCompletion(true);
if (! job2Success) {
System.err.println("Job2 failed, exiting");
System.exit(-1);
}
/* phase 3: flatten to docId => occurrence,... */
Job job3 = new Job(conf, "phase-3");
job3.getConfiguration().set(TERMS_DIR_KEY, basedir + "/temp2");
FileInputFormat.addInputPath(job3, new Path(basedir, "temp1"));
FileOutputFormat.setOutputPath(job3,
new Path(otherArgs[0] + otherArgs[3]));
job3.setJarByClass(TermDocumentMatrixGenerator.class);
job3.setMapperClass(Mapper3.class);
job3.setReducerClass(Reducer3.class);
job3.setInputFormatClass(SequenceFileInputFormat.class);
job3.setOutputFormatClass(TextOutputFormat.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setNumReduceTasks(2);
System.exit(job3.waitForCompletion(true) ? 0 : 1);
}
}
|
I ran this in local mode and pseudo-distributed mode. I had to change the code a bit for the pseudo-distributed mode (I think mostly its the file names), but here is the script I used to run it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | #!/bin/bash
# Source: ./src/main/scripts/hadoop/run_tdmg.sh
# Script to call Term Document Matrix Generator
## CONFIG ##
M2_REPO=/home/sujit/.m2/repository
HADOOP_HOME=/opt/hadoop-0.20.0
PROJECT_BASEDIR=/home/sujit/src/jtmt
MODE=p # mode can be (l)ocal or (p)seudo-distributed
if [ $MODE == "l" ]; then
PROTOCOL_PREFIX=""
STOPWORD_FILE=$PROJECT_BASEDIR/src/main/resources/stopwords.txt
INPUT_DIR=$PROJECT_BASEDIR/src/test/resources/hac/inputs
OUTPUT_DIR=$PROJECT_BASEDIR/src/test/resources/hac/outputs
TEMP_DIRS=$PROJECT_BASEDIR/src/test/resources/hac/temp*
else
PROTOCOL_PREFIX=hdfs://localhost:54310
STOPWORD_FILE=/user/sujit/tdmg/resources/stopwords.txt
INPUT_DIR=/user/sujit/tdmg/inputs
OUTPUT_DIR=/user/sujit/tdmg/outputs
TEMP_DIRS=/user/sujit/tdmg/temp*
fi
## CONFIG ##
# for local mode
if [ $MODE == "l" ]; then
export HADOOP_CLASSPATH=$CLASSPATH:\
$M2_REPO/org/apache/lucene/lucene-core/2.4.0/lucene-core-2.4.0.jar:\
$M2_REPO/org/apache/lucene-analyzers/2.3.0/lucene-analyzers-2.3.0.jar:\
$M2_REPO/commons-lang/commons-lang/2.1/commons-lang-2.1.jar:\
$PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar
fi
cd $HADOOP_HOME
if [ $MODE == "l" ]; then
rm -rf $OUTPUT_DIR
rm -rf $TEMP_DIRS
# no special packaging required for local mode
bin/hadoop jar $PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator "" $STOPWORD_FILE $INPUT_DIR $OUTPUT_DIR
else
bin/hadoop fs -rmr $OUTPUT_DIR
bin/hadoop fs -rmr $TEMP_DIRS
bin/hadoop jar $PROJECT_BASEDIR/target/tdm-generator.jar $PROTOCOL_PREFIX $STOPWORD_FILE $INPUT_DIR $OUTPUT_DIR
fi
cd -
unset HADOOP_CLASSPATH
|
Similarities and Differences
The main difference of course is that you extend mapreduce.Mapper and mapreduce.Reducer instead of extending MapReduceBase and implementing either mapred.Mapper or mapred.Reducer. The Context provides a more unified way to pass stuff around, although I had some trouble (which I later fixed) passing parameters to the Mapper and Reducer classes. Another difference seems to be that the ToolRunner is no longer needed.
I did miss the "convenience" Mappers and Reducers (such as LongSumReducer), but I am guessing that they will make their way back in once the old API goes away.
Among the similarities, you still have to package your application code and third party dependencies in the same way as before, something I was hoping would go away once I learned about HADOOP_CLASSPATH, but I guess it makes sense to keep it the way it is.
37 comments (moderated to prevent spam):
i am unable to find lucene.NumericTokenFilter
please help me out
Hi Pintoo, this is part of my JTMT project. You can find the source for this filter here.
Hi. Salmon, would try to run the Reuters-21578 Test Collection ; http://www.daviddlewis.com/resources/testcollections/reuters21578/
Sincerely yours.
C.C. Yang Data miner ; t9599010@yahoo.com.tw
Thanks for the link, Chih-Chin, and best of luck.
Dear Sujit: would you please tell me how to use your code in Hadoop? thanks, CC Yang
Isn't this post sufficient? I have the code and the shell scripts here, maybe I missed something? In any case, let me know what exactly you are looking for and I can try to help.
Thank Sujit:
We already make your code, after retrieve some earlier functions, But only can run on the Random values. Like to know the how powerful usage your code can work on/to. So, if you could show the resulting form or table...; it help us to explore it.
Chih-Chin Yang (Taiwan)
* i am a teacher, try to introduce exemplar applications in hadoop.
Oh, you mean how I used the data I generated? Actually, now that I think about it, I had planned to use it for larger (in the data sense) experiments on similarity and clustering (based on the comparatively tiny ones I did and described before), but never got around to it (got interested in something else :-)).
So great! But I want to see the photo of the TD Matrix. Can you take a photograph of the TD Matrix result?
Thank you very much!!!
Hi, each line in the output of the third job would be in this form
{docId: [term1_count, term2_count, ....]}
The same job also creates the terms file, something like this:
{term: termId}
so you can relate the two outputs into a term-document matrix of the form:
[[term1_count term2_count ...], // document_1 row
[term1_count term2_count ...], // document_2 row
...]
Hope this makes sense?
hii sujit,
Actually I need help I am giving seminar on implementation of hierarchical clustering and I want to implement this code.I already installed hadoop-1.0.4 on single node.So please tell me is this code will work on hadoop-1.0.4.If yes than please give me some steps how to proceed with this code.plzz reply me as soon as possible..
Hi Ankur, /this/ code should be fine, it worked for me, but please dont attempt the hierarchical clustering implementation I posted in another post - that is very slow and is just not worth doing it that way.
ok...thats fine but can you plzz tell me the steps which I should follow to implement agglomerative hierarchical clustering on single node..I just want to show how to run this code to my guide...
I don't really know. I stopped looking at HAC shortly after I wrote the poorly performing code, my aim is to classify, so any algorithm works as long as it performs within reasonable bounds.
k..than any other option or resource from where I get some help..any website or material..?
You may want to check out Mining of Massive Datasets by Rajaraman, Leskovec and Ullman have a chapter on Clustering in which there is a section on Hierarchical clustering (free PDF available). Since MOMD is all map reduce based this may be helpful for you. The order of the naive HAC that I have is O(n^3) but they describe an approach that is slightly better at O(n^2 log n).
Hi Sujit,
I am a student ,where i just started to learn map reduce programming.
i am trying to do Text Rank alogrthim for a single document.
Till now i have done inverted index of a document.
After that i belive that i need to do document matrix.
Not getting an idea how to proceed further
can you please help me to have a alogorthim for the Textrank in Mapreduce .
Thanks,
Priya
Hi Krishna Priya, I am assuming that by inverted index you mean something like a Lucene index? And your objective is to create a taxonomy (weighted graph) of words from a corpus? If so, one way could be to get all bigrams from the text, then use the bigrams (t1, t2) to search for document pairs (d1, d2) with these terms. The number of incoming links into (t2) would be the text rank of the term t2. You may want to filter the terms you want to use, perhaps remove stopwords and/or non-nouns. To get the bigram pairs, you could use MR, and perhaps lookup a Solr instance to get the corresponding (t2,d2) pairs and reduce over all d2 to get the text rank for terms t2. Just thinking out loud, so hopefully this should work, please don't blame me if this doesn't :-).
Hi Sujit,
Thanks for the reply.
Can you give me a sample code how to write customized record reader to read a complete sentence i.e until period.
You're welcome. Rather than a custom RecordReader, it may be preferable to go a slightly different route, given that HDFS is optimized to work with few large files, and a corpus is typically a collection of many small files. If you wrap the file system with an HTTP server, and then initiate your Job with a TextFile of URLs for the documents, then your mapper could tokenize the incoming sentence into sentences and emit (filename, sentence) tuples. If you went that route, then you could use something as simple as Java's BreakIterator (example code is available in JavaDocs) or a more model based approach using OpenNLP (here is some example code that includes sentence tokenization)., or some other sentence tokenization approach that you may have a preference for.
when i run the script in eclipse europa 3.3.2 and hadoop 0.19.1 version, i get the following output.
"Usage: tdmg "
my input is text file containing reviews.
how to give input file??
I think blogger is eating up the rest of your error message because of the angle brackets - its basically the message in line 295 of the TermDocumentMatrixGenerator code. Your inputs will depend on whether you are running this in local mode (lines 11-15, 39) or hdfs mode (lines 17-21, 43) in the run_tdmg.sh block.
Please suggest which version to be used for lucene core and lucene analyzer. You have mentioned different version in your source lib folder and script file which you used to run the code.
In addition we get the following error.
14/01/08 15:30:48 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
14/01/08 15:30:49 INFO input.FileInputFormat: Total input paths to process : 5
14/01/08 15:30:51 INFO mapred.JobClient: Running job: job_201401081158_0022
14/01/08 15:30:52 INFO mapred.JobClient: map 0% reduce 0%
14/01/08 15:31:03 INFO mapred.JobClient: Task Id : attempt_201401081158_0022_m_000000_0, Status : FAILED
java.lang.NoClassDefFoundError: org/apache/lucene/analysis/Analyzer
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:761)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:806)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:157)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:467)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException: org.apache.lucene.analysis.Analyzer
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
... 8 more
14/01/08 15:31:03 INFO mapred.JobClient: Task Id : attempt_201401081158_0022_m_000001_0, Status : FAILED
java.lang.NoClassDefFoundError: org/apache/lucene/analysis/Analyzer
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:761)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:806)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:157)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:467)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException: org.apache.lucene.analysis.Analyzer
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
... 8 more
Thankz in advance sir :)
Yes, sorry, this was because I kept adding stuff to the project as the Lucene versions changed. The script should be changed to point to lucene-core-3.0.2 and lucene-analyzers-3.0.2 respectively (the code compiles against it so it should also run). One reason for the CNFE you are seeing may be that you don't have M2_REPO environment variable defined or your local ~/.m2/repository does not have the lucene jars named (Analyzer is in lucene-core BTW). So you should change the paths in lines 28-31 of run_tdmg.sh to point to the jar in lib instead of M2_REPO. I did attempt to update the repository, but it won't let me in (haven't updated sourceforge in years, don't blame them :-)). I have asked for a password reset, still waiting, once I get access, I will update it.
THANK YOU SIR.....
I dont have .m2 repo directory. so i removed the line 6 and changed the lines 28-30 as
$HADOOP_HOME/lib/lucene-core-2.4.0.jar:\
how to generate jtmt-1.0-SNAPSHOT.jar??
WHERE to use your script ?? whether cygwin terminal or eclispe??
how to run in pseudo distributed mode??
I also need code to run in pseudo-dist mode******
You are welcome, although I would suggest updating to lucene-core and lucene-analyzers 3.0.2 (the latest available in the application). To generate the jar file you can run "ant jar" from command line at the project directory. You should run the script from the project directory also (since your jars are in lib/*). I prefer running everything from the command line, but I am sure there are other ways, choose what you like. To set up pseudo distributed mode, check out Michael Noll's tutorial - this is what I learned from, and I think its still valid for current versions of Hadoop. The same code and script can run for pseudo-distributed mode also (thats why there is an if condition).
Thank you sir. we generated snapshot in eclipse itself..and it finds jar now.
But sir, i am getting the following exception when i run this program..
14/01/12 14:44:43 INFO input.FileInputFormat: Total input paths to process : 1
14/01/12 14:44:45 INFO mapred.JobClient: Running job: job_201401121442_0002
14/01/12 14:44:46 INFO mapred.JobClient: map 0% reduce 0%
14/01/12 14:45:02 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_0, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
14/01/12 14:45:08 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_1, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
14/01/12 14:45:14 INFO mapred.JobClient: Task Id : attempt_201401121442_0002_m_000000_2, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at net.sf.jtmt.indexers.hadoop.TermDocumentMatrixGenerator$Mapper1.map(TermDocumentMatrixGenerator.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:518)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:303)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
14/01/12 14:45:23 INFO mapred.JobClient: Job complete: job_201401121442_0002
14/01/12 14:45:23 INFO mapred.JobClient: Counters: 3
14/01/12 14:45:23 INFO mapred.JobClient: Job Counters
14/01/12 14:45:23 INFO mapred.JobClient: Launched map tasks=4
14/01/12 14:45:23 INFO mapred.JobClient: Data-local map tasks=4
14/01/12 14:45:23 INFO mapred.JobClient: Failed map tasks=1
Job1 failed, exiting
Is it wrong with my INPUT FORMAT??
i have extracted reviews in text format and converted to sequence file using a program.
Can u give a sample sequence file to run it??
Thanks in advance..:-)
Hi Banu, looks like your input file is in [Long,Text] format (line:content) and the code expects to see [Text,Map] format. There is a BlogSequenceFileWriter class in the same package which was presumably used to build the input sequence file. It may be a good idea to try to adapt it to your needs. In fact, given that its been a while since I wrote the code and I don't remember any of it, looking at the available code and trying to debug it locally is generally likely to be more fruitful than asking me :-).
Sir, thank you so much..
We got the output sir...
:-):-)
Thank you for your help sir
:-):-):-):-):-)
Cool! and you're welcome.
Sir!!!!!!
We have following issues. Please do help us :( :( :(
*The third job creates two files, but one file is empty.
i.e.,it doesn't create the terms file, something like this:
{term: termId}
What to do??
*The first file of 3rd job creates this form of file:
{docId: [term1_count, term2_count, ....]}
but it contains so many 0's at last. Why??
*how to relate the two outputs into a term-document matrix of the form as below??
[[term1_count term2_count ...], // document_1 row
[term1_count term2_count ...], // document_2 row
...]
Any code sir???
*We have two seq files in input directory, but the output of all three phases are generated for only one document i.e., docId[1].
Going by the description, the empty file should be a dictionary of {term:index}, its created in Reducer3.setup. Not sure of the reason it is empty - one guess is that maybe I never checked and it has always been empty. This is because the actual output of Reducer3 is {docId: [{term: count},...]} and not {docId: [{index: count},...]}, so this may just be dead code that I forgot to remove. The output of Reducer3 is the TD matrix, each value corresponds to the sparse term vector for that docId. I don't think the vector should contain 0 counts as we explicitly remove them earlier, and I am not sure why its returning results for a single sequence file.
I need a input txt file for blogSequenceWriter code. The txt files (/resources/hac/inputs) and (/resources/data/blog) are not available. Please provide me soon sir. :( :(
In the lines 195-196, we get a warning:the local variable "value" is never used.
for (LongWritable value : values) {
occurrences++;
}
Will this cause any problem sir??
In the terms file , we get term:docId instead of term:termId. What is the problem??
Thanks in advance:)
@Anonymous: this is the collection of my blog posts from around 2009. I no longer have the data handy, you can try using some other data such as wikipedia dump or something similar.
@Banu: no this warning will not cause a problem. Regarding why term:docid and not term:termid, I have no idea, but the code should tell you...
Hi Sujit, Kindly help me out of this error.Thanks in advance.
14/10/11 20:08:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/10/11 20:08:47 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/10/11 20:08:47 INFO input.FileInputFormat: Total input paths to process : 10
14/10/11 20:08:47 INFO mapred.JobClient: Running job: job_local158039148_0001
14/10/11 20:08:47 INFO mapred.LocalJobRunner: Waiting for map tasks
14/10/11 20:08:47 INFO mapred.LocalJobRunner: Starting task: attempt_local158039148_0001_m_000000_0
14/10/11 20:08:47 INFO util.ProcessTree: setsid exited with exit code 0
14/10/11 20:08:47 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@534153
14/10/11 20:08:47 INFO mapred.MapTask: Processing split: file:/home/muthu/workspace_2/td matrix/sample/7:0+26479
14/10/11 20:08:47 INFO mapred.MapTask: io.sort.mb = 100
14/10/11 20:08:47 INFO mapred.MapTask: data buffer = 79691776/99614720
14/10/11 20:08:47 INFO mapred.MapTask: record buffer = 262144/327680
14/10/11 20:08:47 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader@129c5d3
java.lang.NullPointerException
at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close(SequenceFileRecordReader.java:101)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.close(MapTask.java:496)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1776)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:778)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Hi Muthuraman, the error does not indicate much except there is a NPE thrown somewhere (very likely in your code). I think perhaps the console stack trace that you provided has more of the stack trace? If so, you will find a place where your code is mentioned and that will point to where the NPE is happening.
Post a Comment