Friday, July 10, 2009

Running a Hadoop Job on Amazon EC2

Sometime early last year, a colleague went to the WWW2008 conference at Beijing. One of the ideas he brought back was that of identifying common phrases in use in your vertical by extracting them from the documents in your corpus - the paper it came out of was not even one of the major ones, but it stuck to me, because of its simplicity.

I didn't know anything about Hadoop at the time, so while I had an implementation figured out shortly after the talk, I did not write any code, since I did not have a way to run it on a sufficiently large volume of text. Lately, however, I've been looking at Hadoop again, with a view to running jobs on Amazon's Elastic Compute (EC2) service, so I figured that it may be a good thing to try out.

The way I planned to do it was to generate 2 to 5 word grams from the document, then aggregating them. As an example, the text:

1
First, she tried to look down and make out what she was coming to,...
is decomposed to the following subsequences, then passed into a Hadoop MapReduce job to find how many times each phrase occurred. Downstream code will presumably treat the highest occurring phrases as "common" somehow.

1
2
3
4
5
6
7
8
(first she)
(first she tried)
(first she tried to)
(first she tried to look)
(she tried)
(she tried to)
(she tried to look)
... etc.

The books I used as my "corpus" for this test are Alice in Wonderland, Moby Dick and The Adventures of Sherlock Holmes, all from Project Gutenberg's collection of e-books.

Amazon EC2 Setup

Setting up to work with Amazon's EC2 service is easy if you know how. There are lots of Internet resources, including Amazon's own EC2 documentation pages, that provide information about this. Chuck Lam's Hadoop in Action (Early Access) book has an entire chapter devoted to this, and I basically followed it step by step, and was successful. In a nutshell, here is what I needed to do.

  1. From Amazon's site, create and download the private key file (pk-*) and certificate (cert-*) and copy it to your ~/.ec2 directory.
  2. Download and install Hadoop (if not installed already).
  3. Download and install Amazon's EC2 API Tools.
  4. From Amazon's site, get your account number, the AWS Access key, and the AWS Secret Access Key, and put it in the appropriate places in your $HADOOP_HOME/src/contrib/ec2/hadoop-ec2-env.sh file.
  5. Figure out what instance type you want (I chose m1.medium), and update the hadoop-ec2-env.sh file.
  6. Add this information into your .bash_profile. The snippet from my .bash_profile is shown below. This puts the ec2 api tools and the hadoop-ec2 tools in your PATH, and also provides the tools with information about your private key and certificate.
  7. Source your .bash_profile.
  8. Generate your keypair (ec2-add-keypair gsg-keypair) and store the private part of the generated RSA key to ~/.ec2/id_rsa-gsg-keypair with permissions 600. The tool will put the public part of this keypair in Amazon's repository so you can have passphraseless ssh connectivity.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Snippet from my .bash_profile file (the EC2_PRIVATE_KEY and
# EC2_CERT values are set to dummy values.
...
# Hadoop
export HADOOP_HOME=/opt/hadoop-0.18.1

# EC2 Access
export EC2_HOME=/opt/ec2-api-tools-1.3-36506
export PATH=$PATH:$EC2_HOME/bin:$HADOOP_HOME/src/contrib/ec2/bin
export EC2_PRIVATE_KEY=$HOME/.ec2/pk-ABCD1234EFGH5678.pem
export EC2_CERT=$HOME/.ec2/cert-ABCD1234EFGH5678.pem
...

The code

The book text is first broken up into sentences, and then put together in one large file, one sentence per line. It is run offline, as a sort of data preparation step. Here is the code - there is no Hadoop code here, all it does is to read each of the files downloaded off the Gutenberg site, tokenize the content into sentences, and write them out to the output file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/OfflineSentenceGenerator.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.text.BreakIterator;

import org.apache.commons.io.FileUtils;

/**
 * Preprocesses the Gutenberg books into sentences, one sentence
 * per line.
 */
public class OfflineSentenceWriter {

  private String inputDirectoryPath;
  private String outputFilePath;
  
  public void setInputDirectory(String inputDirectoryPath) {
    this.inputDirectoryPath = inputDirectoryPath;
  }
  
  public void setOutputFile(String outputFilePath) {
    this.outputFilePath = outputFilePath;
  }
  
  public void convertToSentencePerLineFormat() throws Exception {
    File[] inputs = new File(inputDirectoryPath).listFiles();
    PrintWriter output = new PrintWriter(
      new FileWriter(outputFilePath), true);
    for (File input : inputs) {
      BreakIterator sentenceIterator = BreakIterator.getSentenceInstance();
      String text = FileUtils.readFileToString(input, "UTF-8");
      text = text.replaceAll("\n", " ");
      sentenceIterator.setText(text);
      int current = 0;
      for (;;) {
        int end = sentenceIterator.next();
        if (end == BreakIterator.DONE) {
          break;
        }
        String sentence = text.substring(current, end);
        output.println(sentence);
        current = end;
      }
    }
    output.flush();
    output.close();
  }
}

The code to convert a sentence into a series of word grams is done using the WordNGramGenerator.java shown below. It takes a input string (a sentence in our case), and the minimum and maximum size of the word grams to be produced. I find it helpful to pull out the complex parts into their own classes and just use it inside the MapReduce job, rather than building it into the MapReduce code directly, because that way its easier to test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/WordNGramGenerator.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.text.BreakIterator;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;

import org.apache.commons.lang.StringUtils;

/**
 * Given a sentence, generates the specified word N-grams from it and
 * returns it as a List of String.
 */
public class WordNGramGenerator {

  public List<String> generate(String input, int minGram, int maxGram) 
      throws IOException {
    List<String> wordgrams = new ArrayList<String>();
    List<String> tokens = new LinkedList<String>();
    BreakIterator wordBreakIterator = 
      BreakIterator.getWordInstance(Locale.getDefault());
    wordBreakIterator.setText(input);
    int current = 0;
    int gindex = 0;
    for (;;) {
      int end = wordBreakIterator.next();
      if (end == BreakIterator.DONE) {
        break;
      }
      String nextWord = input.substring(current, end);
      current = end;
      if (StringUtils.isBlank(nextWord) ||
          nextWord.length() == 1 && nextWord.matches("\\p{Punct}")) {
        continue;
      }
      gindex++;
      tokens.add(StringUtils.lowerCase(nextWord));
      if (gindex == maxGram) {
        for (int i = minGram; i <= maxGram; i++) {
          wordgrams.add(StringUtils.join(
            tokens.subList(0, i).iterator(), " "));
        }
        gindex--;
        tokens.remove(0);
      }
    }
    return wordgrams;
  }
}

And finally, the MapReduce job to do the phrase extraction and aggregation. The Map class reads a sentence at a time, then calls the WordNGramGenerator to produce the word n-grams, and writes them out. On the Reduce side, Hadoop already has a convenience Reducer (the LongSumReducer) for what I am doing, so I use that.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseExtractor.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;

/**
 * Breaks up input text into sentences, then generates 2-5 grams of
 * the input text.
 */
public class PhraseExtractor {

  private static class MapClass extends MapReduceBase 
      implements Mapper<WritableComparable<Text>,Writable,
                 WritableComparable<Text>,Writable> {

    private static final LongWritable ONE = new LongWritable(1);
    
    public void map(WritableComparable<Text> key, Writable value,
        OutputCollector<WritableComparable<Text>,Writable> output,
        Reporter reporter) throws IOException {
      String sentence = ((Text) value).toString();
      WordNGramGenerator ngramGenerator = new WordNGramGenerator();
      List<String> grams = ngramGenerator.generate(sentence, 2, 5);
      for (String gram : grams) {
        output.collect(new Text(gram), ONE);
      }
    }
  }

  public static void main(String[] argv) throws IOException {
    if (argv.length != 2) {
      System.err.println("Usage: calc input_path output_path");
    }
    JobConf conf = new JobConf(PhraseExtractor.class);
    
    FileInputFormat.addInputPath(conf, new Path(argv[0]));
    FileOutputFormat.setOutputPath(conf, new Path(argv[1]));
    
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);
    
    conf.setMapperClass(MapClass.class);
    conf.setCombinerClass(LongSumReducer.class);
    conf.setReducerClass(LongSumReducer.class);
    conf.setNumReduceTasks(2);
    
    JobClient.runJob(conf);
  }
}

The above code needs to be packaged appropriately into a JAR file. Here is the snippet of Ant code that does this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  <target name="build-hadoop-phrase-extractor" 
      depends="_init" description="Build Phrase Extractor job on Hadoop">
    <!-- create new directory target/lib and copy required runtime
         dependencies for the hadoop job into it -->
    <delete dir="${maven.build.directory}/jars"/>
    <mkdir dir="${maven.build.directory}/jars/lib"/>
    <copy todir="${maven.build.directory}/jars/lib" flatten="true">
      <fileset dir="${maven.repo.local}">
        <include name="commons-lang/commons-lang/2.1/commons-lang-2.1.jar"/>
        <include name="commons-io/commons-io/1.2/commons-io-1.2.jar"/>
      </fileset>
    </copy>
    <!-- create jar file for phrase-extractor -->
    <jar jarfile="${maven.build.directory}/phrase-extractor.jar">
      <fileset dir="${maven.build.directory}/classes"/>
      <fileset dir="${maven.build.directory}/jars"/>
      <manifest>
        <attribute name="Main-Class"
          value="net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseExtractor"/>
      </manifest>
    </jar>
  </target>

Running the Code in EC2

Caution: You are now entering the paid area!. If you are playing along, at this point Amazon is going to charge your credit card for machine time spent. First we launch our EC2 cluster with the following command:

1
2
3
4
5
6
7
8
sujit@sirocco:~/src/jtmt$ hadoop-ec2 launch-cluster sujit 4
Testing for existing master in group: sujit
...
Adding sujit node(s) to cluster group sujit with AMI ami-fe37d397
i-21ebda48
i-23ebda4a
i-25ebda4c
i-27ebda4e

Next we login to our master node. We will run our jobs from the command line on the EC2 master node.

1
2
3
sujit@sirocco:~/src/jtmt$ hadoop-ec2 login sujit
...
[root@domU ~]# 

We then copy over our jar file and input file to the EC2 master node. Our input file is the output of OfflineSentenceWriter, and contains one sentence per line. If you source the hadoop-ec2-env.sh file, you will get access to the environment variable SSH_OPTS, which is convenient. So...

1
2
3
4
5
sujit@sirocco:~/src/jtmt$ . $HADOOP_HOME/src/contrib/ec2/bin/hadoop-ec2-env.sh 
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS target/phrase-extractor.jar \
root@ec2-123-456-789-01.compute-1.amazonaws.com:/root
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS books.txt \
root@ec2-123-456-789-01.compute-1.amazonaws.com:/root

On the EC2 master node, we create a HDFS directory and put the input file into it. You can verify that the file got written using bin/hadoop dfs -lsr /.

1
2
3
[root@domU ~]# cd /usr/local/hadoop-0.18.1/
[root@domU hadoop-0.18.1]# bin/hadoop fs -mkdir /usr/root/inputs
[root@domU hadoop-0.18.1]# bin/hadoop dfs -put ~/books.txt /usr/root/inputs

When I first ran the job, I got back compressed files as the output of my reduce step. Because I didn't want to do the extra step that is mentioned here, I changed the configuration (in conf/hadoop-site.xml) to output without compression, and reran my job.

1
2
3
4
<property>
  <name>mapred.output.compress</name>
  <value>false</value> <!-- was "true" -->
</property>

Here is the command to run the Hadoop job.

1
2
3
[root@domU hadoop-0.18.1]# bin/hadoop jar /root/phrase-extractor.jar \
hdfs://domU.compute-1.internal:50001/usr/root/inputs/books.txt \
hdfs://domU.compute-1.internal:50001/usr/root/outputs

While the code runs, you can also monitor the job through a web interface on port 50030 on the master node. Here are some screenshots.

The job dropped two part-nnnnn files in HDFS in the output subdirectory. I first copied these back to the regular file system on the master node.

1
2
[root@domU ~]# bin/hadoop dfs -get /usr/root/outputs/part-00000 ~/part-00000
...

then back to my local box using scp.

1
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS root@ec2-123-456-789-01.compute-1.amazonaws.com:/root/part-* .

Once done, the cluster can be terminated with this command. At that point, you will exit the Amazon EC2 paid area.

1
2
3
4
5
6
7
8
sujit@sirocco:~/src/jtmt$ hadoop-ec2 terminate-cluster sujit
...
Terminate all instances? [yes or no]: yes
INSTANCE i-d7e8d9be running shutting-down
INSTANCE i-21ebda48 running shutting-down
INSTANCE i-23ebda4a running shutting-down
INSTANCE i-25ebda4c running shutting-down
INSTANCE i-27ebda4e running shutting-down

The part-nnnn files are not sorted by aggregated count, and contain more information than I need. I guess the correct approach is to run another MapReduce to filter and sort the data, but now that the files are not too large, you can just use some Unix command line tools to do this:

1
2
3
4
sujit@sirocco:~/src/jtmt$ cat part-00000 part-00001 | \
awk -F"\t" '{if ($2 != 1) print $0}' | \
sed -e 's/\t/:/' | \
sort -n -r -t ':' -k2 - > sorted

Which returns the expected results (sort of). I realize now that perhaps doing 2-grams to find phrases was a bit ambitous and I should have considered 3 to 5 grams only. If I look only at 3-grams, I find quite a few good phrases such as "as much as", etc.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
of the:2201
in the:1426
to the:892
it was:587
and the:571
it is:562
at the:532
to be:482
from the:478
on the:452
...

The code runs pretty quickly on my local machine and runs even quicker on the EC2 cluster, so I probably did not need to run this on EC2, and in that sense is a waste of money. However, my main aim with this exercise was to set myself up on Amazon EC2 for future processing, so in that sense the expense was justified. I hope you found it useful.