Last week, I described a Phrase Spelling corrector that depended on Word Collocation Probabilities. The probabilities (from the occur_a and occur_ab tables) were based on data that were in the ballpark of what they should be, but the fact remains that I cooked them up. So this week, I describe a Hadoop Map-Reduce class that pulls this information out of real user-entered search terms from Apache access logs.
Background
I learned about the existence of the Map-Reduce programming style about a year ago, when a colleague told us about a free Hadoop seminar hosted by Yahoo!. Unfortunately, by the time I got around to enroll, all the seats were taken. I did read the original Map-Reduce paper by Jeffrey Dean and Sanjay Ghemawat of Google at the time, and while it seemed a neat idea, I wasn't sure where or how I would be able to use it in my work. So not being able to go to the seminar didn't seem like a huge loss.
More recently, however, as I spend more time working in and around text mining, I find myself spending more time waiting for programs to finish executing than I spend writing them. I had made a few half-hearted attempts to try to pick up Hadoop on my own, but it is a fairly steep learning curve, and I was never able to find the time to get to a point where I could model my existing algorithm as an equivalent Map-Reduce program, before having to move on to other things.
On a somewhat unrelated note, I recently also joined East Bay IT Group (EBIG) in an attempt to meet other like-minded tech professionals and to enhance my chances of landing a job closer to home. Just kidding about that last one, since nobody at EBIG seems to be working anywhere east of Oakland. So in any case, the first talk (since my joining) on the Java SIG was on Hadoop, by Owen O'Malley of Yahoo, so I figured that attending it would be a good way to quickly ramp up on Hadoop. I am happy to say that the talk was very informative (thanks Owen!) and I did get enough out of it to be able to write my own code soon after.
Specifications
The structure of our search URL is as follows:
1 2 | http://.../search/q1=term[&name=value...]
where term: a single or multi-word query term
|
The idea is to run through all the access logs and count unique occurrences of single words and unique word pairs from the q1 values. These counts will later be fed into the occur_a and occur_ab tables described in my previous post.
For development, I just use the access_log files that are in my /var/log/httpd directory (just 4 of them), but in the final run on a cluster (not described in this post) will use a years worth of log files.
The MapReduce Class
Here's the code for the MapReduce class. As you can see, the Map and Reduce classes are written as inner classes of the main class. This seems to be the preferred style so I went with it, but it may be more unit-testable if you put each job into its own package and put the Map and Reduce classes inside that package. I did test the supporting classes, and did a quick test run, and things came out ok, so...
The Map class is called MapClass and the Reduce class is called ReduceClass. In addition, there is a PartitionerClass that attempts to send the pair output from the Map to one Reducer and the singleton output to another, so they are "sorted" in the final output, but apparently you cannot have more than one reducer in a non-clustered environment, so you cannot have a Partioner partition Map output to a second Reducer (because it does not exist). That is why the PartitionerClass is defined but commented out in the JobConf settings.
Once the Map and Reduce classes are defined, the main method sets up a JobConf and sets the Map and Reduce classes into it. The framework takes care of the rest. Basically the input is read line by line and passed into a bank of available Map classes. The outputs are accumulated into temporary file(s). Once all Map classes are done, the pairs written by the Map classes are sent to a bank of Reducers, which accumulates them. Once all the Reducers are done, the program ends.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | // Source: src/main/java/com/mycompany/accessloganalyzer/AccessLogAnalyzer.java
package com.mycompany.accessloganalyzer;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import com.mycompany.accessloganalyzer.NcsaLogParser.NcsaLog;
public class AccessLogAnalyzer {
private static class MapClass extends MapReduceBase
implements Mapper<WritableComparable<Text>,Writable,
WritableComparable<Text>,Writable> {
public void map(WritableComparable<Text> key,Writable value,
OutputCollector<WritableComparable<Text>,Writable> output,
Reporter reporter) throws IOException {
String line = ((Text) value).toString();
EnumMap<NcsaLog,String> values = NcsaLogParser.parse(line);
String url = values.get(NcsaLog.REQUEST_URL);
if (url.startsWith("/search")) {
Map<String,String> parameters =
NcsaLogParser.getUrlParameters(url);
String searchTerms = parameters.get("q1");
String[] terms = StringUtils.split(searchTerms, " ");
for (String term : terms) {
output.collect(new Text(term), new LongWritable(1));
}
if (terms.length > 1) {
// need to have at least 2 words to generate pair-wise combinations
CombinationGenerator combinationGenerator =
new CombinationGenerator(terms.length, 2);
Set<Pair> combinations = new HashSet<Pair>();
while (combinationGenerator.hasMore()) {
int[] indices = combinationGenerator.getNext();
combinations.add(new Pair(terms[indices[0]], terms[indices[1]]));
}
for (Pair combination : combinations) {
output.collect(new Text(combination.toString()),
new LongWritable(1));
}
}
}
}
}
private static class ReduceClass extends MapReduceBase
implements Reducer<WritableComparable<Text>,Writable,
WritableComparable<Text>,Writable> {
public void reduce(WritableComparable<Text> key,
Iterator<Writable> values,
OutputCollector<WritableComparable<Text>,Writable> output,
Reporter reporter) throws IOException {
long occurs = 0;
while (values.hasNext()) {
occurs += ((LongWritable) values.next()).get();
}
output.collect(key, new LongWritable(occurs));
}
}
private static class PartitionerClass
implements Partitioner<WritableComparable<Text>,Writable> {
public void configure(JobConf conf) { /* NOOP */ }
public int getPartition(WritableComparable<Text> key, Writable value,
int numReduceTasks) {
if (numReduceTasks > 1) {
String k = ((Text) key).toString();
return (k.contains(",") ? 1 : 0);
}
return 0;
}
}
static class Pair {
public String first;
public String second;
public Pair(String first, String second) {
String[] pair = new String[] {first, second};
Arrays.sort(pair);
this.first = pair[0];
this.second = pair[1];
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Pair)) {
return false;
}
Pair that = (Pair) obj;
return (this.first.equals(that.first) &&
this.second.equals(that.second));
}
@Override
public String toString() {
return StringUtils.join(new String[] {first, second}, ",");
}
}
public static void main(String[] argv) throws IOException {
if (argv.length != 2) {
System.err.println("Usage: calc input_path output_path");
System.exit(-1);
}
JobConf jobConf = new JobConf(AccessLogAnalyzer.class);
FileInputFormat.addInputPath(jobConf, new Path(argv[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(argv[1]));
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setMapperClass(MapClass.class);
jobConf.setCombinerClass(ReduceClass.class);
jobConf.setReducerClass(ReduceClass.class);
// jobConf.setPartitionerClass(PartitionerClass.class);
jobConf.setNumReduceTasks(2);
JobClient.runJob(jobConf);
}
}
|
Supporting Classes
NCSA Log Parser
I went looking for a NCSA Log Parser but couldn't find one, so I wrote my own. I tried to make it generic, since I will probably be re-using this parser to pull other stuff out of the logs in the future. The parser described below parses the NCSA Common Log file format, which is smallest of the three NCSA Log formats. Here is the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | // Source: src/main/java/com/mycompany/accessloganalyzer/NcsaLogParser.java
package com.mycompany.accessloganalyzer;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.commons.lang.StringUtils;
public class NcsaLogParser {
public enum NcsaLog {
HOST,
PROTOCOL,
USERNAME,
DATE,
TIME,
TIMEZONE,
REQUEST_METHOD,
REQUEST_URL,
REQUEST_PROTOCOL,
STATUS_CODE,
BYTE_COUNT
};
public static EnumMap<NcsaLog,String> parse(String logline) {
EnumMap<NcsaLog,String> values =
new EnumMap<NcsaLog, String>(NcsaLog.class);
StringTokenizer tok = new StringTokenizer(logline, " ");
if (tok.hasMoreTokens()) {
values.put(NcsaLog.HOST, tok.nextToken());
values.put(NcsaLog.PROTOCOL, tok.nextToken());
values.put(NcsaLog.USERNAME, tok.nextToken());
String dttm = tok.nextToken();
values.put(NcsaLog.DATE, dttm.substring(1, dttm.indexOf(':')));
values.put(NcsaLog.TIME, dttm.substring(dttm.indexOf(':') + 1));
String tz = tok.nextToken();
values.put(NcsaLog.TIMEZONE, tz.substring(0, tz.length() - 1));
String requestMethod = tok.nextToken();
values.put(NcsaLog.REQUEST_METHOD, requestMethod.substring(1));
values.put(NcsaLog.REQUEST_URL, tok.nextToken());
String requestProtocol = tok.nextToken();
values.put(NcsaLog.REQUEST_PROTOCOL,
requestProtocol.substring(0, requestProtocol.length() - 1));
values.put(NcsaLog.STATUS_CODE, tok.nextToken());
values.put(NcsaLog.BYTE_COUNT, tok.nextToken());
}
return values;
}
public static Map<String,String> getUrlParameters(String url) throws IOException {
Map<String,String> parameters = new HashMap<String,String>();
int pos = url.indexOf('?');
if (pos == -1) {
return parameters;
}
String queryString = url.substring(pos + 1);
String[] nvps = queryString.split("&");
for (String nvp : nvps) {
String[] pair = nvp.split("=");
if (pair.length != 2) {
continue;
}
String key = pair[0];
String value = pair[1];
// URL decode the value, replacing + and %20 etc chars with their
// non-encoded equivalents.
try {
value = URLDecoder.decode(value, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IOException("Unsupported encoding", e);
}
// replace all punctuation by space
value = value.replaceAll("\\p{Punct}", " ");
// lowercase it
value = StringUtils.lowerCase(value);
parameters.put(key, value);
}
return parameters;
}
}
|
Combination Generator
I needed a way to enumerate all pairs of words I find in a multi-word phrase. Michael Gilleland has already written one that works great, so all I did was to just copy this into my own package structure and use it. You can read/snag the code from Michael's site.
Packaging
Hadoop needs the classes packaged a certain way. Along with the compiled classes, you also want to add in any runtime dependency JAR files in a lib/ directory. You can optionally supply a MANIFEST.MF file specifying the Main-Class if you want to use the java -jar calling style. Since I use Maven, but don't really know of an easy way to write ad-hoc scripts to build new Maven goals, I decided to generate an Ant build.xml using Maven, then writing a new target.
- To generate the Ant build.xml, run mvn ant:ant.
- Add hadoop.jar to build.classpath
- Add definitions for input and output directories for the job
- Add the hadoop build and run target (shown below).
My target to package and run the jar are shown below. In the future, when I run this on a remote cluster, I will decouple it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | <target name="hadoop-log-analyzer"
description="Launch AccessLogAnalyzer job on Hadoop" depends="compile">
<!-- create new directory target/lib and copy required runtime
dependencies for the hadoop job into it -->
<delete dir="${maven.build.directory}/jars/lib"/>
<mkdir dir="${maven.build.directory}/jars/lib"/>
<copy todir="${maven.build.directory}/jars/lib" flatten="true">
<fileset dir="${maven.repo.local}">
<include name="commons-lang/commons-lang/2.1/commons-lang-2.1.jar"/>
</fileset>
</copy>
<!-- create jar file with classes and libraries -->
<jar jarfile="${maven.build.directory}/log-analyzer.jar">
<fileset dir="${maven.build.directory}/classes"/>
<fileset dir="${maven.build.directory}/jars"/>
<manifest>
<attribute name="Main-Class"
value="com/healthline/accessloganalyzer/AccessLogAnalyzer"/>
</manifest>
</jar>
<!-- clean up output directory -->
<delete dir="${basedir}/src/main/resources/access_log_outputs"/>
<!-- run jar in hadoop -->
<exec executable="bin/hadoop" dir="/opt/hadoop-0.18.1">
<arg value="jar"/>
<arg value="${basedir}/target/log-analyzer.jar"/>
<arg value="${basedir}/src/main/resources/access_logs"/>
<arg value="${basedir}/src/main/resources/access_log_outputs"/>
</exec>
</target>
|
My only runtime dependency was commons-lang-2.3.jar which I provide to the package in the target above.
(Local) Dev Testing
Unlike Owen, my other computer is not a data center. In fact, unless you count my work desktop, my laptop is the only computer I have. So I need to be able to test the job on my laptop first. Here is what I had to do.
- Explode the Hadoop tarball into a local directory. My hadoop directory (HADOOP_HOME) is /opt/hadoop-0.18.1.
- In the $HADOOP_HOME/conf/hadoop-env.sh file, update JAVA_HOME to point to whatever it is on your machine. The export is commented out, uncomment and update.
- Run the package using the Ant target
Here is the output of running ant hadoop-log-analyzer from the command line (edited for readability by removing dates from the logs).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | hadoop-log-analyzer:
[exec] jvm.JvmMetrics: Initializing JVM Metrics with \
processName=JobTracker, sessionId=
[exec] mapred.FileInputFormat: Total input paths to process : 5
[exec] mapred.JobClient: Running job: job_local_1
[exec] mapred.MapTask: numReduceTasks: 1
[exec] mapred.LocalJobRunner: \
file:~/myapp/src/main/resources/access_logs/access_log.3:0+118794
[exec] mapred.TaskRunner: Task 'map_0000' done.
[exec] mapred.MapTask: numReduceTasks: 1
[exec] mapred.LocalJobRunner: \
file:~/myapp/src/main/resources/access_logs/access_log.4:0+3811
[exec] mapred.TaskRunner: Task 'map_0001' done.
[exec] mapred.MapTask: numReduceTasks: 1
[exec] mapred.LocalJobRunner: \
file:~/myapp/src/main/resources/access_logs/access_log:0+446816
[exec] mapred.TaskRunner: Task 'map_0002' done.
[exec] mapred.MapTask: numReduceTasks: 1
[exec] mapred.LocalJobRunner: \
file:~/myapp/src/main/resources/access_logs/access_log.2:0+99752
[exec] mapred.TaskRunner: Task 'map_0003' done.
[exec] mapred.MapTask: numReduceTasks: 1
[exec] mapred.LocalJobRunner: \
file:~/myapp/src/main/resources/access_logs/access_log.1:0+36810
[exec] mapred.TaskRunner: Task 'map_0004' done.
[exec] mapred.LocalJobRunner: reduce > reduce
[exec] mapred.TaskRunner: Task 'reduce_mglk8q' done.
[exec] mapred.TaskRunner: Saved output of task \
'reduce_mglk8q' to file:~/myapp/src/main/resources/access_log_outputs
[exec] mapred.JobClient: Job complete: job_local_1
[exec] mapred.JobClient: Counters: 9
[exec] mapred.JobClient: Map-Reduce Framework
[exec] mapred.JobClient: Map input records=2661
[exec] mapred.JobClient: Map output records=186
[exec] mapred.JobClient: Map input bytes=705983
[exec] mapred.JobClient: Map output bytes=3381
[exec] mapred.JobClient: Combine input records=186
[exec] mapred.JobClient: Combine output records=34
[exec] mapred.JobClient: Reduce input groups=31
[exec] mapred.JobClient: Reduce input records=34
[exec] mapred.JobClient: Reduce output records=31
|
Output is in the output directory in a file called part-0000, and here are a few lines from it to show what it looks like. In a full clustered system, we would be able to use the Partitioner to partition the pairs and the singletons into two Reducers so they will be distinct chunks.
1 2 3 4 5 6 7 8 9 | asthma 6
breast 7
breast,cancer 7
cancer 18
diabetes 3
diaries 1
diaries,headache 1
disease 6
...
|
So thats pretty much it. The MapReduce style is a very powerful mechanism that allows average developers with domain expertise to write code that can be run within a framework, such as Hadoop, on large clusters to parallelize the computation. So its worth knowing, especially if you need to write batch programs that run on large data sets. I believe that I now understand enough about Hadoop to be able to use it effectively, and have reached a stage where I can pick up what I don't know. I hope this article has helped you in a similar way as well.
Hi,
ReplyDeleteWhat about use Hadoop with a POS Tagger (like Stanford or Snowball)? Do you test it?
Hi Luis, no I haven't, sorry. But your comment gave me an idea (of using Stanford and Hadoop together), thanks for that :-).
ReplyDeletehiiiii...
ReplyDeletei did hadoop with stanford lemmatization. Its running fine with small inputs but as soon as I give 1gb of data, its throwing me some kind of child error: task exists with non-zero status 255..
i guess it has to do with memory issue.tried adding -Xmx4096m in mapred-site.xml.but its not working..can u suggest a way to solve this issue?
Hi Aisha, if you are using the standard approach of sending the entire text to CoreNLP and let it annotate the whole thing, then query the annotations, its probably the problem you are seeing. One way to work around this would be to use a streaming sentence splitter like Stanford's DocumentPreprocessor to convert your 1GB input into a list of sentences, then run lemmatizer against each sentence.
ReplyDelete