Hierarchical Agglomerative Clustering is a bottom up clustering approach where at each stage, we find the closest two documents (or document clusters) and merge them into a new cluster. The process continues until some convergence criteria is satisfied.
Last week, I described a Map-Reduce job to generate a TD Matrix from the articles on my blog. This week I use the TD Matrix data to cluster the articles using the Hierarchical Agglomerative Clustering algorithm. The code models a 1 + (n * 3) stage pipeline of Map-Reduce jobs, where n is the number of stages (number of documents - number of clusters).
The distance measure to compute the closeness between two documents (or document clusters) is cosine similarity. At each stage, two items are merged into one, so one of the criteria for convergence is when the remaining number of document (or document clusters) are less than a threshold value. The other convergence criterion is when merging a document into a document or cluster would result in a cluster that is "too large", ie over a similarity threshold value.
Driver Code
Here is the driver program. As you can see, the very first step is to normalize the term frequencies across the document - that way the document coordinates in the term space are comparable to each other. Then the code executes in a loop, where 3 Map-Reduce jobs are run in a pipeline until the convergence criteria is satisfied, then terminates. There is some description for each job in the comments, and I also describe each job in more detail below.
I have been using the "recommended" strategy of using inner public static classes so far, but that typically results in very long code, so I switched to using separate classes for the Mapper and Reducer classes. I think this style is easier to read and maintain, especially if you use an IDE, since you can follow [CTRL+Click] hyperlinks.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/HierarchicalAgglomerativeClusterer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Hadoop Job to do Hierarchical Agglomerative (bottom up) clustering.
* We start with a normalized set of term frequencies for each document.
* At each stage, we find the two documents with the highest similarity
* and merge them into a cluster "document". We stop when the similarity
* between two documents is lower than a predefined threshold or when
* the document set is clustered into a predefined threshold of clusters.
*/
public class HierarchicalAgglomerativeClusterer {
// configuration:
/** Don't cluster if similarity is below this threshold */
public static final Float SIMILARITY_THRESHOLD = 0.05F;
/** Maximum number of clusters to be created */
public static final Long MAX_CLUSTERS = 10L;
// keys: used by Mappers/Reducers
public static final String INPUT_DIR_KEY = "input.dir";
public static final String INPUT_SIMILARITY_MAP_DIR_KEY =
"input.sim.mapdir";
public static final String SIMILARITY_THRESHOLD_KEY =
"similarity.threshold";
// reporting:
public enum Counters {REMAINING_RECORDS};
/**
* The input to this job is a text file of document ids mapped
* to a stringified list of raw term occurrences for each
* qualifying term in the document set. This method will do
* a self-join on the input file, and pass it to the reducer,
* which will calculate and output the inter-document cosine
* similarities.
* @param conf the global Configuration object.
* @param indir the input directory for the raw TFs.
* @param outdir the output directory for the normalized TFs.
* @throws Exception if thrown.
*/
private static void normalizeFrequencies(
Configuration conf, Path indir, Path outdir) throws Exception {
Job job = new Job(conf, "normalize-freqs");
job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
FileInputFormat.addInputPath(job, indir);
FileOutputFormat.setOutputPath(job, outdir);
job.setMapperClass(TfNormalizerMapper.class);
job.setReducerClass(TfNormalizerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(2);
boolean jobStatus = job.waitForCompletion(true);
if (! jobStatus) {
throw new Exception(job.getJobName() + " failed");
}
}
/**
* Does a self-join on the normalized TF file (for the current run),
* and passes each pair to the reducer, which computes the inter-document
* cosine similarity, and writes out the interdocument cosine similarities
* as the output.
* @param conf the global Configuration object.
* @param dataInput the directory containing the normalized TF file.
* @param simInput the directory containing the similarity data from the
* previous run (null in case of the first run).
* @param simOutput the output directory where the new similarity data
* is written. At each stage, only the similarity data that was
* not computed previously is computed.
* @param iteration the run number (0-based).
* @throws Exception if thrown.
*/
private static void computeSimilarity(Configuration conf, Path dataInput,
Path simInput, Path simOutput, int iteration) throws Exception {
Job job = new Job(conf, "compute-similarity/" + iteration);
job.getConfiguration().set(INPUT_DIR_KEY, dataInput.toString());
if (iteration > 0) {
job.getConfiguration().set(
INPUT_SIMILARITY_MAP_DIR_KEY, simInput.toString());
}
FileInputFormat.addInputPath(job, dataInput);
FileOutputFormat.setOutputPath(job, simOutput);
job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
job.setMapperClass(SelfJoinMapper.class);
job.setReducerClass(SimilarityCalculatorReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(2);
boolean jobStatus = job.waitForCompletion(true);
if (! jobStatus) {
throw new Exception(job.getJobName() + " failed");
}
}
/**
* Runs through the similarity data, finds the pair with the highest
* similarity. For documents that are part of that pair, the key is
* set to the paired key (key1+key2) and sent to the reducer, where
* the coordinates of the merged cluster is computed by adding each
* coordinate position and dividing by 2. Other records which are
* not part of the pair are passed through unchanged. The merging
* will happen only if the similarity is above the similarity threshold.
* @param conf the global configuration object.
* @param dataInput the input data file for this run, containing the
* normalized TFs.
* @param dataOutput the output directory, which contains the new document
* set, after merging of the pair with the highest similarity.
* @param simOutput the similarity output of the previous run, to
* compute the most similar document/cluster pairs.
* @param iteration the run number (0-based).
* @throws Exception if thrown.
*/
private static void clusterDocs(Configuration conf, Path dataInput,
Path dataOutput, Path simOutput, int iteration) throws Exception {
Job job = new Job(conf, "add-remove-docs/" + iteration);
job.getConfiguration().setFloat(
SIMILARITY_THRESHOLD_KEY, SIMILARITY_THRESHOLD);
job.getConfiguration().set(
INPUT_SIMILARITY_MAP_DIR_KEY, simOutput.toString());
FileInputFormat.addInputPath(job, dataInput);
FileOutputFormat.setOutputPath(job, dataOutput);
job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
job.setMapperClass(DocClusteringMapper.class);
job.setReducerClass(DocClusteringReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(2);
boolean jobStatus = job.waitForCompletion(true);
if (! jobStatus) {
throw new Exception(job.getJobName() + " failed");
}
}
private static long countRemainingRecords(Configuration conf,
Path dataOutput, Path countOutput) throws Exception {
Job job = new Job(conf, "count-remaining-records");
FileInputFormat.addInputPath(job, dataOutput);
FileOutputFormat.setOutputPath(job, countOutput);
job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
job.setMapperClass(RecordCountMapper.class);
job.setReducerClass(RecordCountReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
boolean jobStatus = job.waitForCompletion(true);
if (jobStatus) {
FileSystem fs = FileSystem.get(conf);
FileStatus[] fstatuses = fs.listStatus(countOutput);
for (FileStatus fstatus : fstatuses) {
Path path = fstatus.getPath();
if (! path.getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = fs.open(path);
BufferedReader reader =
new BufferedReader(new InputStreamReader(fis));
String line = reader.readLine();
reader.close();
fis.close();
return Long.valueOf(StringUtils.split(line, "\t")[1]);
}
} else {
throw new Exception(job.getJobName() + " failed");
}
return 0L;
}
/**
* This is how we are called.
* @param argv the input directory containing the raw TFs.
* @throws Exception if thrown.
*/
public static void main(String[] argv) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs =
new GenericOptionsParser(conf, argv).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage hac <indir>");
System.exit(-1);
}
Path indir = new Path(otherArgs[0]);
Path basedir = indir.getParent();
// phase 1: normalize the term frequency across each document
normalizeFrequencies(conf, indir, new Path(basedir, "temp0"));
int iteration = 0;
long previousRemainingRecords = 0L;
for (;;) {
// set up constants for current iteration
Path dataInput = new Path(basedir, "temp" + iteration);
Path dataOutput = new Path(basedir, "temp" + (iteration + 1));
Path simInput = new Path(basedir, "temp_sim" + iteration);
Path simOutput = new Path(basedir, "temp_sim" + (iteration + 1));
Path countOutput = new Path(basedir, "temp_count" + (iteration + 1));
// phase 2: do self-join on input file and compute similarity matrix
// inputs: self-join on files from temp_${iteration}
// reference: similarity matrix file from temp_sim_${iteration},
// null if iteration=0.
// outputs: similarity matrix file into temp_sim_${iteration+1}
computeSimilarity(conf, dataInput, simInput, simOutput, iteration);
// phase 3: find most similar pair, add pair, remove components
// input: files from temp_${iteration}
// reference: files from temp_sim_${iteration} to create matrix
// output: files into temp_${iteration+1}
clusterDocs(conf, dataInput, dataOutput, simOutput, iteration);
// check for termination criteria: either our pre-set maximum
// clusters for the document set has been reached, or clustering
// has converged, so any cluster that will be created is "too large".
// This is checked for in the DocClusteringReducer and it will
// not merge the rows in that case.
long numRemainingRecords =
countRemainingRecords(conf, dataOutput, countOutput);
if (numRemainingRecords <= MAX_CLUSTERS ||
numRemainingRecords == previousRemainingRecords) {
break;
}
previousRemainingRecords = numRemainingRecords;
iteration++;
}
System.out.println("Output in " + new Path(basedir, "temp" + iteration));
}
}
|
Phase 1: TF Normalization
The first stage is to normalize the term frequencies across the document. This is done by simply dividing each element of the document vector by the sum of all the elements. The mapper parses the comma-separated list of term frequencies into a SparseVector, sums over all the elements, divides each element by the sum, then converts it back to the comma-separated list.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Normalizes each document vector by dividing each element by the
* sum of all the elements. Passes to the Reducer.
*/
public class TfNormalizerMapper
extends Mapper<LongWritable,Text,Text,Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] kvp = StringUtils.split(value.toString(), "\t");
String[] frequencies = StringUtils.split(kvp[1], ",");
SparseRealVector tf = new OpenMapRealVector(frequencies.length);
for (int i = 0; i < frequencies.length; i++) {
tf.setEntry(i, Double.valueOf(frequencies[i]));
}
double sum = tf.getL1Norm();
SparseRealVector normalizedTfs =
new OpenMapRealVector(tf.mapDivide(sum));
StringBuilder nbuf = new StringBuilder();
int len = normalizedTfs.getDimension();
for (int i = 0; i < len; i++) {
if (i > 0) {
nbuf.append(",");
}
nbuf.append(String.valueOf(normalizedTfs.getEntry(i)));
}
context.write(new Text(kvp[0]), new Text(nbuf.toString()));
}
}
|
The reducer in this stage is a simple identity reducer. The end product of this transformation is the normalized TFs for each document.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* This is a simple identity reducer. It does no reduction, the record
* is already created in the mapper, it simply picks the first (and only)
* mapped record in the Iterable and writes it.
*/
public class TfNormalizerReducer extends Reducer<Text,Text,Text,Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Text value = values.iterator().next();
context.write(key, value);
}
}
|
Phase 2: Self-Join and Similarity Calculation
The mapper in this phase does a self join of each record in the input with all the other records, and passes this on to the reducer. The self-join is done by passing in the data file(s) from the previous phase as the input data and also as a configuration parameter. For each record in the input data, the file is opened, read, joined with the input data and closed. This does result in a lot of disk IO - perhaps a better way performance-wise would be to read the file(s) from the configuration parameter into an in-memory list, since I only have about 170 documents to consider. But I wanted to figure out a way to work with arbitarily large input files, so this seemed to be a more general way to do this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SelfJoinMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import net.sf.jtmt.clustering.hadoop.agglomerative.HierarchicalAgglomerativeClusterer.Counters;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Does a map-side replicated join to produce a self-join of the
* input data files of normalized TFs. I could probably have built
* a mapper which uses a in-memory structure for one of the sides
* of the join given the size of the data, but I am trying to do a
* POC, so the "real" data is likely to be larger. So the join
* repeatedly opens and closes one copy of the file that is joined
* with the main input file. The output is the self-joined file.
*/
public class SelfJoinMapper
extends Mapper<LongWritable,Text,Text,Text> {
private String inputDir;
private FileSystem fs;
private int numRecords = 0;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
inputDir = context.getConfiguration().get(
HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY);
if (inputDir == null) {
System.err.println("Cant get value for key:" +
HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY + ", abort");
System.exit(-1);
}
fs = FileSystem.get(context.getConfiguration());
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
numRecords++;
String[] lhsPair = StringUtils.split(value.toString(), "\t");
FileStatus[] fstatuses = fs.listStatus(new Path(inputDir));
for (FileStatus fstatus : fstatuses) {
Path path = fstatus.getPath();
if (! path.getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = fs.open(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = reader.readLine()) != null) {
String[] rhsPair = StringUtils.split(line, "\t");
Text newKey = new Text(
StringUtils.join(new String[] {lhsPair[0], rhsPair[0]}, "/"));
Text newValue = new Text(
StringUtils.join(new String[] {lhsPair[1], rhsPair[1]}, "/"));
context.write(newKey, newValue);
}
reader.close();
fis.close();
}
// report back to the framework how many records are remaining
// for debugging purposes.
context.getCounter(Counters.REMAINING_RECORDS).increment(1L);
}
@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
}
}
|
On the reducer side, the joined records are converted into term vectors and the cosine similarity calculated, and the output is the similarity matrix for the document-set, ie, a key-value pair containing the keys of the original documents and the similarity value. The similarity matrix from a previous run, if it exists, is fed into the reducer, so it does not have to recalculate the values that have already been calculated.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SimilarityCalculatorReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Takes as input the self-join of the normalized TF data, and the
* similarity matrix data from the previous run (if it exists). Looks
* up the similarity between a pair of keys (if it exists) and writes
* it out to the output. If it does not exist, then it computes it.
* The first time (when there is no previous run), the number of
* similarity computations is O(n**2). In later runs, it is O(n).
*/
public class SimilarityCalculatorReducer
extends Reducer<Text,Text,Text,DoubleWritable> {
private String inputSimilarityMapDir;
private FileSystem fs;
private Map<String,Double> similarityMatrix =
new HashMap<String,Double>();
@Override
public void setup(Context context)
throws IOException, InterruptedException {
inputSimilarityMapDir = context.getConfiguration().get(
HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
if (inputSimilarityMapDir == null) {
System.err.println("Warning: no input similarity map dir, ignoring");
} else {
fs = FileSystem.get(context.getConfiguration());
FileStatus[] fstatuses = fs.listStatus(
new Path(inputSimilarityMapDir));
for (FileStatus fstatus : fstatuses) {
Path path = fstatus.getPath();
if (! path.getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = fs.open(path);
BufferedReader reader = new BufferedReader(
new InputStreamReader(fis));
String line = null;
while ((line = reader.readLine()) != null) {
String[] kvpairs = StringUtils.split(line, "\t");
List<String> docIds = Arrays.asList(
StringUtils.split(kvpairs[0], "/"));
Collections.sort(docIds);
String mapKey = StringUtils.join(docIds.iterator(), "/");
if (! similarityMatrix.containsKey(mapKey)) {
similarityMatrix.put(mapKey, Double.valueOf(kvpairs[1]));
}
}
reader.close();
fis.close();
}
}
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Text value = values.iterator().next();
List<String> docIds = Arrays.asList(
StringUtils.split(key.toString(), "/"));
Collections.sort(docIds);
String mapKey = StringUtils.join(docIds.iterator(), "/");
if (! similarityMatrix.containsKey(mapKey)) {
// compute the cosine similarity between the two documents
String[] termFreqs = StringUtils.split(value.toString(), "/");
SparseRealVector doc1 = buildDocVector(termFreqs[0]);
SparseRealVector doc2 = buildDocVector(termFreqs[1]);
double cosim =
doc1.dotProduct(doc2) / (doc1.getNorm() * doc2.getNorm());
similarityMatrix.put(mapKey, cosim);
}
context.write(new Text(mapKey),
new DoubleWritable(similarityMatrix.get(mapKey)));
}
private SparseRealVector buildDocVector(String flist) {
String[] freqs = StringUtils.split(flist, ",");
SparseRealVector doc = new OpenMapRealVector(freqs.length);
for (int i = 0; i < freqs.length; i++) {
doc.setEntry(i, Double.valueOf(freqs[i]));
}
return doc;
}
}
|
Phase 3: Clustering
The Mapper part of this job finds the document pair that has the maximum similarity from the similarity map generated in the previous run. This pair corresponds to the document (or document cluster) pair that will be merged in this iteration. The input to the Mapper is the normalized TF data generated in Phase 1 (or the output of the previous phase 3 job in the previous iteration). For each of these two documents, the key is replaced by the composite key containing both document ids in the pair and passed to the reducer. All other rows are passed through as is.
The mapper also checks to see if the maximum similarity found is greater than the similarity threshold, otherwise, it will not do any special handling. As a consequence, the number of documents in the clustered document set will be unchanged from the previous run. This fact is used in our termination criteria.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Uses the similarity map and the normalized TF data from the previous
* run, and computes the pair with the maximum similarity. For the docs
* that are part of this pair, it will change the key to be the combined
* key for the cluster to be formed and pass it to the reducer. This will
* only happen for 2 documents in each iteration. The other docs are passed
* through unchanged.
*/
public class DocClusteringMapper extends Mapper<LongWritable,Text,Text,Text> {
private double similarityThreshold = 0.0D;
private double maxSim = 0.0D;
private String[] maxSimKeys = null;
public void setup(Context context)
throws IOException, InterruptedException {
similarityThreshold = context.getConfiguration().getFloat(
HierarchicalAgglomerativeClusterer.SIMILARITY_THRESHOLD_KEY, 0.0F);
String simDir = context.getConfiguration().get(
HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
if (simDir == null) {
System.err.println("Cant get value for key: " +
HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY +
", abort");
System.exit(-1);
}
FileSystem fs = FileSystem.get(context.getConfiguration());
FileStatus[] fstatuses = fs.listStatus(new Path(simDir));
for (FileStatus fstatus : fstatuses) {
Path path = fstatus.getPath();
if (! path.getName().startsWith("part-r")) {
continue;
}
FSDataInputStream fis = fs.open(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = reader.readLine()) != null) {
String[] kvpair = StringUtils.split(line, "\t");
String[] docIdPair = StringUtils.split(kvpair[0], "/");
if (docIdPair[0].equals(docIdPair[1])) {
// same doc, expected max, so skip
continue;
}
if (Double.valueOf(kvpair[1]) > maxSim) {
maxSim = Double.valueOf(kvpair[1]);
maxSimKeys = docIdPair;
}
}
reader.close();
fis.close();
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] kvpair = StringUtils.split(value.toString(), "\t");
if (maxSim > similarityThreshold &&
(kvpair[0].equals(maxSimKeys[0]) ||
kvpair[0].equals(maxSimKeys[1]))) {
// if either of the keys in maxSimKeys match the key in the
// record, then replace the key with the combo-key (this key
// represents the "cluster")
String newKey = StringUtils.join(maxSimKeys, "+");
context.write(new Text(newKey), new Text(kvpair[1]));
} else {
// pass record through unchanged
context.write(new Text(kvpair[0]), new Text(kvpair[1]));
}
}
}
|
On the reducer side, the two documents that are supposed to be merged are going to have the same composite key, so when this is encountered, the reduction is to average the coordinates of the two documents into one. All other documents are passed through unchanged.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | // Source: ./src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reduces the normalized TF data produced by the mapper. The mapper
* produced 2 documents with the same key, which are candidates for
* the new cluster. The reducer will compute the new cluster coordinates
* using the coordinates of its components. The other documents are
* written out unchanged.
*/
public class DocClusteringReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
double numRecords = 0D;
SparseRealVector doc = null;
for (Text value : values) {
String[] tfs = StringUtils.split(value.toString(), ",");
if (doc == null) {
doc = new OpenMapRealVector(tfs.length);
}
for (int i = 0; i < tfs.length; i++) {
doc.setEntry(i, doc.getEntry(i) + Double.valueOf(tfs[i]));
}
numRecords++;
}
SparseRealVector cluster =
new OpenMapRealVector(doc.mapDivide(numRecords));
int numTerms = cluster.getDimension();
StringBuilder buf = new StringBuilder();
for (int i = 0; i < numTerms; i++) {
if (i > 0) {
buf.append(",");
}
buf.append(String.valueOf(cluster.getEntry(i)));
}
// replace the "+" in the key with "," since its done clustering
String newKey = StringUtils.replace(key.toString(), "+", ",");
context.write(new Text(newKey), new Text(buf.toString()));
}
}
|
Phase 4: Checking for termination
The last phase just checks for the number of clustered documents in the document set. If the number of clustered documents reach a certain (our MAX_CLUSTERS) threshold, or is unchanged from the previous run (because it hit our SIMILARITY_THRESHOLD), then the loop is terminated and the job ends. The mapper here is a simple one, it writes a constant key and a 1 for each line it sees.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper to count number of lines in a file. For each line sent to the
* mapper, it sends a constant key and a 1 to the reducer to add.
*/
public class RecordCountMapper
extends Mapper<LongWritable,Text,Text,LongWritable> {
private static final Text COUNT_KEY = new Text("count");
private static final LongWritable ONE = new LongWritable(1L);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(COUNT_KEY, ONE);
}
}
|
The reducer just counts it and returns the count. The code in the driver then reads the file and retrieves the count and returns it to the main method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Sums up the number of line counts sent by the mapper and returns a
* single line of output.
*/
public class RecordCountReducer
extends Reducer<Text,LongWritable,Text,LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum++;
}
context.write(key, new LongWritable(sum));
}
}
|
Running
I've run this (for 3-5 iterations) on both a standalone and pseudo-distributed modes, and they run fine in both scenarios, although they take a fair amount of time. Here is the script I used to run it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | #!/bin/bash
# Source: src/main/scripts/hadoop/run_hac.sh
# Script to call Hierarchical Agglomerative Clusterer.
## CONFIG ##
M2_REPO=/home/sujit/.m2/repository
HADOOP_HOME=/opt/hadoop-0.20.0
PROJECT_BASEDIR=/home/sujit/src/jtmt
MODE=p # mode can be (l)ocal or (p)seudo-distributed
if [ $MODE == "l" ]; then
PROTOCOL_PREFIX=$PROJECT_BASEDIR/src/test/resources
else
PROTOCOL_PREFIX=hdfs://localhost:54310
fi
INPUT_DIR=/hac/input
TEMP_DIRS=/hac/temp*
## CONFIG ##
# for local mode
if [ $MODE == "l" ]; then
export HADOOP_CLASSPATH=$CLASSPATH:\
$M2_REPO/commons-lang/commons-lang/2.1/commons-lang-2.1.jar:\
$M2_REPO/commons-math/commons-math/2.0/commons-math-2.0.jar
fi
cd $HADOOP_HOME
if [ $MODE == "l" ]; then
rm -rf $PROTOCOL_PREFIX$TEMP_DIRS
# no special packaging required for local mode
bin/hadoop jar $PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar \
net...HierarchicalAgglomerativeClusterer \
$PROTOCOL_PREFIX$INPUT_DIR
else
bin/hadoop fs -rmr $TEMP_DIRS
bin/hadoop jar $PROJECT_BASEDIR/target/ha-clusterer.jar \
$PROTOCOL_PREFIX$INPUT_DIR
fi
cd -
unset HADOOP_CLASSPATH
|
I wanted to run this on Amazon EC2, but at the time of writing this post, there were no freely available Amazon EC2 machine images that included Hadoop 0.20. Arun Jacob describes here how he created his own Hadoop-0.20 AMI. Chad Metcalfe of Cloudera pointed out another way to get around this situation by choosing a stock AMI and installing Hadoop/CDH2 on it. Ideally, I would probably just wait for the Hadoop 0.20 AMI to become available and then use it. However, in the event that it does not anytime soon, the second approach seems (to me anyway) the next best.