I've been doing the Introduction to Data Science course on Coursera, and one of the assignments involved writing and running some Pig scripts on Amazon Elastic Map Reduce (EMR). I've used EMR in the past, but have avoided it ever since I got burned pretty badly for leaving it on. Being required to use it was a good thing, since I got over the inertia and also saw how much nicer the user interface had become since I last saw it.
I was doing another (this time Python based) project for the same class, and figured it would be educational to figure out how to run Python code on EMR. From a quick search on the Internet, mrjob from Yelp appeared to be the one to use on EMR, so I wrote my code using mrjob.
The code reads an input file of sentences, and builds up trigram, bigram and unigram counts of the words in the sentences. It also normalizes the text, lowercasing, replacing numbers and stopwords with placeholder tokens, and Porter stemming the remaining words. Heres the code, as you can see, its fairly straightforward:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
from __future__ import division from mrjob.job import MRJob import nltk import string class NGramCountingJob(MRJob): def mapper_init(self): # self.stopwords = nltk.corpus.stopwords.words("english") self.stopwords = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now']) self.porter = nltk.PorterStemmer() def mapper(self, key, value): def normalize_numeric(x): xc = x.translate(string.maketrans("", ""), string.punctuation) return "_NNN_" if xc.isdigit() else x def normalize_stopword(x): return "_SSS_" if str(x) in self.stopwords else x cols = value.split("|") words = nltk.word_tokenize(cols) # normalize number and stopwords and stem remaining words words = [word.lower() for word in words] words = [normalize_numeric(word) for word in words] words = [normalize_stopword(word) for word in words] words = [self.porter.stem(word) for word in words] trigrams = nltk.trigrams(words) for trigram in trigrams: yield (trigram, 1) bigram = trigram[1:] yield (bigram, 1) unigram = bigram[1:] yield (unigram, 1) def reducer(self, key, values): yield (key, sum([value for value in values])) if __name__ == "__main__": NGramCountingJob.run()
The class must extend MRJob and call its run() method when invoked from the shell. The MRJob class implements a sequence of methods that will be called (which can be overriden) - so we just override the appropriate methods. The mrjob framework runs over Hadoop Streaming, but offers many convenience features.
I first tried using the EMR console to create a job flow with mrjob, but the closest I found was "Streaming Jobs". Streaming jobs require the mapper and reducer scripts and the input files to reside on Amazon's S3 storage. Output is also written to S3. However, I was not able to make this setup work with the mrjob code above.
Reading some more, it turns out that mrjob jobs can be started from your local shell with the "-r emr" switch, and it will copy your input and scripts to S3, create a job flow, run your job, write output to S3, and then copy the output back to STDOUT of your local shell, where you can capture it. The first thing that is needed is an .mrjob.conf file. Mine looks like this (with the secret bits appropriately sanitized).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Source: $HOME/.mrjob.conf runners: emr: aws_access_key_id: 53CR3T53CR3T53CR3T53 aws_region: us-west-1 aws_secret_access_key: SuperSecretAccessKeyIfITellYouGottaKillU ec2_key_pair: EMR ec2_key_pair_file: /path/to/pem/file.pem bootstrap_cmds: - sudo easy_install http://nltk.googlecode.com/files/nltk-2.0b5-py2.6.egg ec2_instance_type: m1.small num_ec2_core_instances: 4 cmdenv: TZ: America/Los_Angeles local: base_tmp_dir: /tmp/$USER
The configuration will start up a EC2 m1.small master node with 4 slave nodes of the same type. The bootstrap_cmds installs NLTK on all the worker nodes, since my code is using it and because it doesn't come standard with Python installs. I also had a call to nltk.corpus to read English stopwords, but I just changed the code to declare the list explicitly since I didn't want to install the full corpus.
You can run the code locally (for testing, generally on a subset of the data) as follows.
sujit@cyclone:parw$ python ngram_counting_job.py input.txt > output.txt
Or you can run on EMR by adding a "-r emr" switch, or running on your own Hadoop cluster by adding a "-r hadoop" switch to your command. The EMR version is shown below.
sujit@cyclone:parw$ python ngram_counting_job.py input.txt -r emr > output.txt
Of course, you can monitor your job from the EMR console as it is running. This is all I've done with mrjob so far, but I hope to do much more with it.