Saturday, June 01, 2013

MapReduce with Python and mrjob on Amazon EMR


I've been doing the Introduction to Data Science course on Coursera, and one of the assignments involved writing and running some Pig scripts on Amazon Elastic Map Reduce (EMR). I've used EMR in the past, but have avoided it ever since I got burned pretty badly for leaving it on. Being required to use it was a good thing, since I got over the inertia and also saw how much nicer the user interface had become since I last saw it.

I was doing another (this time Python based) project for the same class, and figured it would be educational to figure out how to run Python code on EMR. From a quick search on the Internet, mrjob from Yelp appeared to be the one to use on EMR, so I wrote my code using mrjob.

The code reads an input file of sentences, and builds up trigram, bigram and unigram counts of the words in the sentences. It also normalizes the text, lowercasing, replacing numbers and stopwords with placeholder tokens, and Porter stemming the remaining words. Heres the code, as you can see, its fairly straightforward:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from __future__ import division
from mrjob.job import MRJob
import nltk
import string

class NGramCountingJob(MRJob):

  def mapper_init(self):
#    self.stopwords = nltk.corpus.stopwords.words("english")
    self.stopwords = set(['i', 'me', 'my', 'myself', 'we',
      'our', 'ours', 'ourselves', 'you', 'your', 'yours',
      'yourself', 'yourselves', 'he', 'him', 'his', 'himself',
      'she', 'her', 'hers', 'herself', 'it', 'its', 'itself',
      'they', 'them', 'their', 'theirs', 'themselves', 'what',
      'which', 'who', 'whom', 'this', 'that', 'these', 'those',
      'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
      'have', 'has', 'had', 'having', 'do', 'does', 'did',
      'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or',
      'because', 'as', 'until', 'while', 'of', 'at', 'by',
      'for', 'with', 'about', 'against', 'between', 'into',
      'through', 'during', 'before', 'after', 'above', 'below',
      'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off',
      'over', 'under', 'again', 'further', 'then', 'once',
      'here', 'there', 'when', 'where', 'why', 'how', 'all',
      'any', 'both', 'each', 'few', 'more', 'most', 'other',
      'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same',
      'so', 'than', 'too', 'very', 's', 't', 'can', 'will',
      'just', 'don', 'should', 'now'])
    self.porter = nltk.PorterStemmer()

  def mapper(self, key, value):

    def normalize_numeric(x):
      xc = x.translate(string.maketrans("", ""), string.punctuation)
      return "_NNN_" if xc.isdigit() else x

    def normalize_stopword(x):
      return "_SSS_" if str(x) in self.stopwords else x

    cols = value.split("|")
    words = nltk.word_tokenize(cols[1])
    # normalize number and stopwords and stem remaining words
    words = [word.lower() for word in words]
    words = [normalize_numeric(word) for word in words]
    words = [normalize_stopword(word) for word in words]
    words = [self.porter.stem(word) for word in words]
    trigrams = nltk.trigrams(words)
    for trigram in trigrams:
      yield (trigram, 1)
      bigram = trigram[1:]
      yield (bigram, 1)
      unigram = bigram[1:]
      yield (unigram, 1)

  def reducer(self, key, values):
    yield (key, sum([value for value in values]))

if __name__ == "__main__":
  NGramCountingJob.run()

The class must extend MRJob and call its run() method when invoked from the shell. The MRJob class implements a sequence of methods that will be called (which can be overriden) - so we just override the appropriate methods. The mrjob framework runs over Hadoop Streaming, but offers many convenience features.

I first tried using the EMR console to create a job flow with mrjob, but the closest I found was "Streaming Jobs". Streaming jobs require the mapper and reducer scripts and the input files to reside on Amazon's S3 storage. Output is also written to S3. However, I was not able to make this setup work with the mrjob code above.

Reading some more, it turns out that mrjob jobs can be started from your local shell with the "-r emr" switch, and it will copy your input and scripts to S3, create a job flow, run your job, write output to S3, and then copy the output back to STDOUT of your local shell, where you can capture it. The first thing that is needed is an .mrjob.conf file. Mine looks like this (with the secret bits appropriately sanitized).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Source: $HOME/.mrjob.conf
runners:
  emr:
    aws_access_key_id: 53CR3T53CR3T53CR3T53
    aws_region: us-west-1
    aws_secret_access_key: SuperSecretAccessKeyIfITellYouGottaKillU
    ec2_key_pair: EMR
    ec2_key_pair_file: /path/to/pem/file.pem
    bootstrap_cmds:
    - sudo easy_install http://nltk.googlecode.com/files/nltk-2.0b5-py2.6.egg
    ec2_instance_type: m1.small
    num_ec2_core_instances: 4
    cmdenv:
      TZ: America/Los_Angeles
  local:
    base_tmp_dir: /tmp/$USER

The configuration will start up a EC2 m1.small master node with 4 slave nodes of the same type. The bootstrap_cmds installs NLTK on all the worker nodes, since my code is using it and because it doesn't come standard with Python installs. I also had a call to nltk.corpus to read English stopwords, but I just changed the code to declare the list explicitly since I didn't want to install the full corpus.

You can run the code locally (for testing, generally on a subset of the data) as follows.

1
sujit@cyclone:parw$ python ngram_counting_job.py input.txt > output.txt

Or you can run on EMR by adding a "-r emr" switch, or running on your own Hadoop cluster by adding a "-r hadoop" switch to your command. The EMR version is shown below.

1
sujit@cyclone:parw$ python ngram_counting_job.py input.txt -r emr > output.txt

Of course, you can monitor your job from the EMR console as it is running. This is all I've done with mrjob so far, but I hope to do much more with it.

3 comments (moderated to prevent spam):

Shreyas FNU said...

Hi, I am trying to use nltk like you did, but alas doesn't work for me. Here is my question: http://stackoverflow.com/questions/23440564/bootstrapping-libraries-on-emr-using-python-mrjob

Any comments would be wonderful

Sujit Pal said...

Your configuration may be a bit off, as someone observed on your SO question. The configuration I show worked for me, although I remember that it barfed on the stopwords list because it expected the stopwords corpus to be loaded. It had taken me a while to figure out how to bootstrap nltk, so instead of trying to load the corpus via bootstrap, I simply replaced it with an explicit list. I remember also that I tried various alternative ways to install nltk, and this was the one that worked.

Sujit Pal said...

Just came across this when looking for something else. Here is a script to install nltk (and its dependencies) on AWS's S3, maybe consider replacing the bootstrap_cmds with this one if the original does not work (it did for me, but Shreyas FNU indicated that it didn't for him):

s3://awsdocs/gettingstarted/latest/sentiment/config-nltk.sh

Inside the script, are a bunch of easy_install calls:

sudo easy_install pip
sudo pip install -U numpy
sudo pip install numpy
sudo easy_install -U distribute
sudo pip install pyyaml nltk
sudo pip install -e git://github.com/mdp-toolkit/mdp-toolkit#egg=MDP