In my previous post, I described how I used GNU Parallel to read a fairly large Lucene index into a set of Hadoop SequenceFiles. The objective is to use the data in the index to build a Unigram and Bigram Language Model for a spelling corrector. Since the spelling correction code is going to be called from a web application, I figured a good place to store the unigrams and bigrams in a MySQL database.
This is a fairly trivial task from the point of view of writing Map-Reduce code (the unigram writer is just a minor variation of the WordCount example), but this is the first time I was using Map-Reduce to crunch through a reasonably large dataset. I was also running Hadoop on a single large machine in pseudo-distributed mode, unlike previously where I mostly used it in local mode to build little proofs of concept. So there were certain things I learned about running Hadoop, which I will mention as they come up. But first, the code.
Java Code
As stated above, the code for both the UnigramCounter and BigramCounter are fairly trivial examples of Map-Reduce code. But I include them anyway, for completeness.
UnigramCounter.java
The UnigramCounter Mapper splits up the input text into words and writes them out to the context, where the Reducer picks them up and aggregates the counts, computes the soundex and metaphone values for the word, and writes the record out to a database table. The soundex and metaphones are for finding sound-alikes - I am not sure which one will give me the best results, so I compute both.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | package com.mycompany.spell3.train;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.language.Metaphone;
import org.apache.commons.codec.language.Soundex;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class UnigramCounter extends Configured implements Tool {
private static final String PROP_DBNAME = "dbname";
private static final String PROP_DBUSER = "dbuser";
private static final String PROP_DBPASS = "dbpass";
private static final String NULL_PATH = "/prod/hadoop/dummy";
public static class MapClass extends
Mapper<LongWritable,Text,Text,IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException,
InterruptedException {
String s = StringUtils.lowerCase(value.toString());
String[] words = s.split("[^a-z]+");
for (String word : words) {
context.write(new Text(word), ONE);
}
}
}
public static class ReduceClass extends
Reducer<Text,IntWritable,Text,IntWritable> {
private String MYSQL_DB_DRIVER = "com.mysql.jdbc.Driver";
private Connection conn;
private PreparedStatement ps;
private AtomicInteger counter = new AtomicInteger(0);
private Soundex soundex;
private Metaphone metaphone;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
Class.forName(MYSQL_DB_DRIVER);
Configuration conf = context.getConfiguration();
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/" + conf.get(PROP_DBNAME),
conf.get(PROP_DBUSER), conf.get(PROP_DBPASS));
conn.setAutoCommit(false);
ps = conn.prepareStatement(
"insert into unigram_counts(word,soundex,metaphone,cnt) " +
"values (?,?,?,?)");
soundex = new Soundex();
metaphone = new Metaphone();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
insertToDb(key.toString(), sum);
}
private void insertToDb(String word, int count)
throws IOException {
try {
ps.setString(1, word);
ps.setString(2, soundex.soundex(word));
ps.setString(3, metaphone.metaphone(word));
ps.setInt(4, count);
ps.execute();
int current = counter.incrementAndGet();
if (current % 1000 == 0) {
conn.commit();
}
} catch (SQLException e) {
System.out.println("Failed to insert unigram: " + word);
e.printStackTrace();
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
if (ps != null) {
try { ps.close(); } catch (SQLException e1) {}
}
if (conn != null) {
try {
conn.commit();
conn.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path(args[0]);
Path output = new Path(NULL_PATH);
Configuration conf = getConf();
conf.set(PROP_DBNAME, args[1]);
conf.set(PROP_DBUSER, args[2]);
conf.set(PROP_DBPASS, args[3]);
Job job = new Job(conf, "Unigram-Counter");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(UnigramCounter.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(5);
boolean succ = job.waitForCompletion(true);
if (! succ) {
System.out.println("Job failed, exiting");
return -1;
}
return 0;
}
public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.out.println(
"Usage: UnigramCounter path_to_seqfiles output_db db_user db_pass");
System.exit(-1);
}
int res = ToolRunner.run(new Configuration(),
new UnigramCounter(), args);
System.exit(res);
}
}
|
BigramCounter.java
The BigramCounter Mapper uses a Sentence BreakIterator to break the input up into sentences, computes bigrams of word pairs within each sentence and writes them out to the context, where the Reducer picks them up, aggregates the counts and writes the bigram and count to another database table.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | package com.mycompany.spell3.train;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.BreakIterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BigramCounter extends Configured implements Tool {
private static final String PROP_DBNAME = "dbname";
private static final String PROP_DBUSER = "dbuser";
private static final String PROP_DBPASS = "dbpass";
private static final String NULL_PATH = "/prod/hadoop/dummy";
public static class MapClass extends
Mapper<LongWritable,Text,Text,IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private static final String SENTENCE_START = "<s>";
private static final String SENTENCE_END = "</s>";
private static final String WORD_SEPARATOR = "__";
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String s = value.toString();
BreakIterator sit = BreakIterator.getSentenceInstance();
sit.setText(s);
int start = sit.first();
int end = -1;
while ((end = sit.next()) != BreakIterator.DONE) {
String sentence = StringUtils.lowerCase(s.substring(start, end));
start = end;
String[] words = sentence.split("[^a-z]+");
String prevWord = null;
for (int i = 0; i < words.length; i++) {
String bigram = null;
if (i == 0) {
// begin sentence
bigram = StringUtils.join(
new String[] {SENTENCE_START, words[i]},
WORD_SEPARATOR);
} else if (i == words.length - 1) {
// end sentence
bigram = StringUtils.join(
new String[] {words[i], SENTENCE_END},
WORD_SEPARATOR);
} else {
// middle of sentence
bigram = StringUtils.join(new String[] {
prevWord, words[i]}, WORD_SEPARATOR);
}
context.write(new Text(bigram), ONE);
prevWord = words[i];
}
}
}
}
public static class ReduceClass extends
Reducer<Text,IntWritable,Text,IntWritable> {
private static final String MYSQL_DB_DRIVER = "com.mysql.jdbc.Driver";
private Connection conn;
private PreparedStatement ps;
private AtomicInteger counter = new AtomicInteger(0);
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
Class.forName(MYSQL_DB_DRIVER);
Configuration conf = context.getConfiguration();
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/" + conf.get(PROP_DBNAME),
conf.get(PROP_DBUSER), conf.get(PROP_DBPASS));
conn.setAutoCommit(false);
ps = conn.prepareStatement(
"insert into bigram_counts(bigram,cnt) values (?,?)");
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
insertToDb(key.toString(), sum);
}
private void insertToDb(String bigram, int sum)
throws IOException {
try {
ps.setString(1, bigram);
ps.setInt(2, sum);
ps.execute();
int current = counter.incrementAndGet();
if (current % 1000 == 0) {
conn.commit();
}
} catch (SQLException e) {
System.out.println("Failed to insert bigram: " + bigram);
e.printStackTrace();
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
if (ps != null) {
try { ps.close(); } catch (SQLException e) {}
}
if (conn != null) {
try {
conn.commit();
conn.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path(args[0]);
Path output = new Path(NULL_PATH);
Configuration conf = getConf();
conf.set(PROP_DBNAME, args[1]);
conf.set(PROP_DBUSER, args[2]);
conf.set(PROP_DBPASS, args[3]);
Job job = new Job(conf, "Bigram-Counter");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(BigramCounter.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(5);
boolean succ = job.waitForCompletion(true);
if (! succ) {
System.out.println("Job failed, exiting");
return -1;
}
return 0;
}
public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.out.println(
"Usage: BigramCounter path_to_seqfiles output_db db_user db_pass");
System.exit(-1);
}
int res = ToolRunner.run(new Configuration(),
new BigramCounter(), args);
System.exit(res);
}
}
|
Hadoop Configuration Changes
Hadoop is built to run on clusters of many medium size machines. What I had instead was one large 16-CPU machine, so I wanted to make sure that its processing power was utilized to the maximum possible. So I made the following changes to mapred-site.xml based on the advice in this StackOverflow page.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | <!-- Source: $HADOOP_HOME/conf/mapred-site.xml -->
<configuration>
...
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>10</value>
<description/>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>10</value>
<description/>
</property>
</configuration>
|
In core-site.xml, I changed the location of the hadoop.tmp.dir to a large, relatively unused partition on the box instead of its default location. This was actually in response to a job failure where it ran out of HDFS space. Since at that point I had to rerun the job again anyway, I shut down Hadoop, deleted the old hadoop.tmp.dir and then restarted Hadoop and reformatted the namenode.
1 2 3 4 5 6 7 8 9 10 | <!-- Source; $HADOOP_HOME/conf/core-site.xml -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/prod/hadoop/tmp</value>
<description>A base for other temporary directories.</description>
</property>
...
</configuration>
|
Since I have only a single data node, I set the dfs.replication in hdfs-site.xml to 1.
1 2 3 4 5 6 7 8 9 10 | <!-- $HADOOP_HOME/conf/hdfs-site.xml -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
<description/>
</property>
</configuration>
|
MySQL Configuration Changes
The default location of the MySQL data directory was in /var/lib/mysql, which was in the "/" partition, too small for my purposes. I actually ran out of disk space in this partition while writing bigrams to MySQL (the job just hangs at a fixed map-reduce completion status). I had to kill the job, shut down MySQL, reconfigure the data directory and the socket location, move the contents over to the new location, and restart MySQL. Here are the configuration changes:
1 2 3 4 5 6 7 | # Source: /etc/my.cnf
[mysqld]
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
datadir=/prod/mysql_db
socket=/prod/mysql_db/mysql.sock
...
|
Deployment
Before this, I used to write shell scripts that set the JARS required by Hadoop and my application in the classpath, and then called Java. When I was doing this, I discovered that you can use the $HADOOP_HOME/bin/hadoop to call your custom Map-Reduce tasks as well, so I decided to use that.
However, I needed to set a few custom JAR files that Hadoop did not have (or need) in its classpath. I was using commons-codec which provided me implementations of Soundex and Metaphone, and I was writing to a MySQL database for which I needed the JDBC driver JAR, plus a few others for functionality I was too lazy to implement on my own.
There are two ways to supply these extra JAR files to the bin/hadoop script. One is by specifying their paths in the -libjars parameter. I thought this was nice, but it didn't work for me - for some reason it could not see the parameters I was passing to my Map-Reduce job via the command line. The second way is to package your custom JARs in the lib subdirectory of your application's JAR file, a so-called fat jar. The fat JAR approach was the one I took, creating it using the simple Ant target shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 | <target name="fatjar" depends="compile" description="Build JAR to run in Hadoop">
<mkdir dir="${maven.build.output}/lib"/>
<copy todir="${maven.build.output}/lib">
<fileset dir="${custom.jars.dir}">
<include name="commons-lang3-3.0.1.jar"/>
<include name="commons-codec-1.3.jar"/>
<include name="mysql-connector-java-5.0.5-bin.jar"/>
...
</fileset>
</copy>
<jar jarfile="${maven.build.directory}/${maven.build.final.name}-fatjar.jar"
basedir="${maven.build.output}" excludes="**/package.html"/>
</target>
|
Once this is done, the script to run either job is quite simple. I show the cscript to run the BigramCounter below, simply replace with UnigramCounter for the other one.
1 2 3 4 5 6 | #!/bin/bash
# Source: bin/bigram_counter.sh
HADOOP_HOME=/opt/hadoop-1.0.1
$HADOOP_HOME/bin/hadoop fs -rmr /prod/hadoop/dummy
$HADOOP_HOME/bin/hadoop jar /path/to/my-fatjar.jar \
com.mycompany.spell3.train.BigramCounter $*
|
To run this script from the command line:
1 2 | hduser@bigmac:spell3$ nohup ./bigram_counter.sh /prod/hadoop/spell \
spelldb spelluser spelluser &
|
Job Killing
I needed to kill the job midway multiple times, either because I discovered I had goofed on some programming issue (incorrect database column names, etc) and the job would start throwing all kinds of exceptions down the line, or because (as mentioned previously), MySQL ran out of disk space. To do this, you need to use bin/hadoops job -kill command.
1 2 3 4 | hduser@bigmac:hadoop-1.0.1$ # list out running jobs
hduser@bigmac:hadoop-1.0.1$ bin/hadoop job -list
hduser@bigmac:hadoop-1.0.1$ # kill specific job
hduser@bigmac:hadoop-1.0.1$ bin/hadoop job -kill ${job-id}
|
Even I had enough sense to not do a kill -9 on the hadoop daemon itself, but there was one time when I did a stop-all.sh and ended up having to throw away all my data because Hadoop got all choked up.
Another little tip is to avoid throwing exceptions from your Mapper or Reducer. A better option is to log it. This is true for any batch job, of course, but I once had one of the jobs fail after about 2 days of processing because of too many exceptions thrown by the Reducer. In the code above, I just used a System.ot.println() to log SQLExceptions if they occur, but its better to use a real logger.
So anyway, after about a week and a half of processing (including all sorts of silly but expensive mistakes), I ended up with approximately 400 million unigrams and 600 million bigrams in the database. Now to figure out how to actually use this information :-).
Update - 2012-04-09: I had a bug in my bigram generation code, which caused bad results, so I reran it. This time the job failed two times in a row, caused by (I suspect) extremely high loads on the MySQL database server. The first time I discovered that the mysql.sock file disappeared, so I terminated the job manually. The second time I found that the mysql.sock file would disappear and then reappear after a while once the load came back down (this is the only place I have found another mention of this) - however, ultimately this job failed as well. I ended up writing the bigrams and counts to text files in HDFS and the job completed in a fraction of the time it took before. So another lesson learned - avoid writing out to external datastores from within Hadoop,
2 comments (moderated to prevent spam):
This was an interesting read. But I just cringed looking at what your Hadoop reduce job was doing -- committing to a database after each row! You ultimately concluded to not even write to a database, and sure, that's a possibility, but another is to optimize your reducer so that it uses JDBC batches. I'm not even sure if you need to have all the intermediate commits -- it depends on your database.
Hi David, it would indeed be cringeworthy if I did that :-). I am actually committing every 1000 rows (see inside the insertToDb() method in both cases). Although come to think of it, 1000 is probably a bit on the low side, I could probably increase it and see better performance. I like the idea of using JDBC batches, thanks for that, I will try it next time.
Post a Comment