Friday, September 25, 2009

Hierarchical Agglomerative Clustering with Hadoop

Hierarchical Agglomerative Clustering is a bottom up clustering approach where at each stage, we find the closest two documents (or document clusters) and merge them into a new cluster. The process continues until some convergence criteria is satisfied.

Last week, I described a Map-Reduce job to generate a TD Matrix from the articles on my blog. This week I use the TD Matrix data to cluster the articles using the Hierarchical Agglomerative Clustering algorithm. The code models a 1 + (n * 3) stage pipeline of Map-Reduce jobs, where n is the number of stages (number of documents - number of clusters).

The distance measure to compute the closeness between two documents (or document clusters) is cosine similarity. At each stage, two items are merged into one, so one of the criteria for convergence is when the remaining number of document (or document clusters) are less than a threshold value. The other convergence criterion is when merging a document into a document or cluster would result in a cluster that is "too large", ie over a similarity threshold value.

Driver Code

Here is the driver program. As you can see, the very first step is to normalize the term frequencies across the document - that way the document coordinates in the term space are comparable to each other. Then the code executes in a loop, where 3 Map-Reduce jobs are run in a pipeline until the convergence criteria is satisfied, then terminates. There is some description for each job in the comments, and I also describe each job in more detail below.

I have been using the "recommended" strategy of using inner public static classes so far, but that typically results in very long code, so I switched to using separate classes for the Mapper and Reducer classes. I think this style is easier to read and maintain, especially if you use an IDE, since you can follow [CTRL+Click] hyperlinks.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/HierarchicalAgglomerativeClusterer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Hadoop Job to do Hierarchical Agglomerative (bottom up) clustering.
 * We start with a normalized set of term frequencies for each document.
 * At each stage, we find the two documents with the highest similarity
 * and merge them into a cluster "document". We stop when the similarity
 * between two documents is lower than a predefined threshold or when
 * the document set is clustered into a predefined threshold of clusters.
 */
public class HierarchicalAgglomerativeClusterer {

  // configuration:
  /** Don't cluster if similarity is below this threshold */
  public static final Float SIMILARITY_THRESHOLD = 0.05F;
  /** Maximum number of clusters to be created */
  public static final Long MAX_CLUSTERS = 10L;
  
  // keys: used by Mappers/Reducers
  public static final String INPUT_DIR_KEY = "input.dir";
  public static final String INPUT_SIMILARITY_MAP_DIR_KEY =
    "input.sim.mapdir";
  public static final String SIMILARITY_THRESHOLD_KEY =
    "similarity.threshold";

  // reporting:
  public enum Counters {REMAINING_RECORDS};

  /**
   * The input to this job is a text file of document ids mapped 
   * to a stringified list of raw term occurrences for each 
   * qualifying term in the document set. This method will do 
   * a self-join on the input file, and pass it to the reducer,
   * which will calculate and output the inter-document cosine
   * similarities.
   * @param conf the global Configuration object.
   * @param indir the input directory for the raw TFs.
   * @param outdir the output directory for the normalized TFs.
   * @throws Exception if thrown.
   */
  private static void normalizeFrequencies(
      Configuration conf, Path indir, Path outdir) throws Exception {
    Job job = new Job(conf, "normalize-freqs");
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    FileInputFormat.addInputPath(job, indir);
    FileOutputFormat.setOutputPath(job, outdir);
    job.setMapperClass(TfNormalizerMapper.class);
    job.setReducerClass(TfNormalizerReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  /**
   * Does a self-join on the normalized TF file (for the current run),
   * and passes each pair to the reducer, which computes the inter-document
   * cosine similarity, and writes out the interdocument cosine similarities
   * as the output.
   * @param conf the global Configuration object.
   * @param dataInput the directory containing the normalized TF file.
   * @param simInput the directory containing the similarity data from the
   *        previous run (null in case of the first run).
   * @param simOutput the output directory where the new similarity data
   *        is written. At each stage, only the similarity data that was
   *        not computed previously is computed.
   * @param iteration the run number (0-based).
   * @throws Exception if thrown.
   */
  private static void computeSimilarity(Configuration conf, Path dataInput,
      Path simInput, Path simOutput, int iteration) throws Exception {
    Job job = new Job(conf, "compute-similarity/" + iteration);
    job.getConfiguration().set(INPUT_DIR_KEY, dataInput.toString());
    if (iteration > 0) {
      job.getConfiguration().set(
        INPUT_SIMILARITY_MAP_DIR_KEY, simInput.toString());
    }
    FileInputFormat.addInputPath(job, dataInput);
    FileOutputFormat.setOutputPath(job, simOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(SelfJoinMapper.class);
    job.setReducerClass(SimilarityCalculatorReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  /**
   * Runs through the similarity data, finds the pair with the highest
   * similarity. For documents that are part of that pair, the key is
   * set to the paired key (key1+key2) and sent to the reducer, where 
   * the coordinates of the merged cluster is computed by adding each
   * coordinate position and dividing by 2. Other records which are 
   * not part of the pair are passed through unchanged. The merging 
   * will happen only if the similarity is above the similarity threshold.
   * @param conf the global configuration object.
   * @param dataInput the input data file for this run, containing the
   *        normalized TFs.
   * @param dataOutput the output directory, which contains the new document
   *        set, after merging of the pair with the highest similarity.
   * @param simOutput the similarity output of the previous run, to 
   *        compute the most similar document/cluster pairs.
   * @param iteration the run number (0-based).
   * @throws Exception if thrown.
   */
  private static void clusterDocs(Configuration conf, Path dataInput,
      Path dataOutput, Path simOutput, int iteration) throws Exception {
    Job job = new Job(conf, "add-remove-docs/" + iteration);
    job.getConfiguration().setFloat(
      SIMILARITY_THRESHOLD_KEY, SIMILARITY_THRESHOLD);
    job.getConfiguration().set(
      INPUT_SIMILARITY_MAP_DIR_KEY, simOutput.toString());
    FileInputFormat.addInputPath(job, dataInput);
    FileOutputFormat.setOutputPath(job, dataOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(DocClusteringMapper.class);
    job.setReducerClass(DocClusteringReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  private static long countRemainingRecords(Configuration conf, 
      Path dataOutput, Path countOutput) throws Exception {
    Job job = new Job(conf, "count-remaining-records");
    FileInputFormat.addInputPath(job, dataOutput);
    FileOutputFormat.setOutputPath(job, countOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(RecordCountMapper.class);
    job.setReducerClass(RecordCountReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    boolean jobStatus = job.waitForCompletion(true);
    if (jobStatus) {
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] fstatuses = fs.listStatus(countOutput);
      for (FileStatus fstatus : fstatuses) {
        Path path = fstatus.getPath();
        if (! path.getName().startsWith("part-r")) {
          continue;
        }
        FSDataInputStream fis = fs.open(path);
        BufferedReader reader = 
          new BufferedReader(new InputStreamReader(fis));
        String line = reader.readLine();
        reader.close();
        fis.close();
        return Long.valueOf(StringUtils.split(line, "\t")[1]);
      }
    } else {
      throw new Exception(job.getJobName() + " failed");
    }
    return 0L;
  }
  
  /**
   * This is how we are called.
   * @param argv the input directory containing the raw TFs.
   * @throws Exception if thrown.
   */
  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = 
      new GenericOptionsParser(conf, argv).getRemainingArgs();
    if (otherArgs.length != 1) {
      System.err.println("Usage hac <indir>");
      System.exit(-1);
    }
    Path indir = new Path(otherArgs[0]);
    Path basedir = indir.getParent();
    
    // phase 1: normalize the term frequency across each document
    normalizeFrequencies(conf, indir, new Path(basedir, "temp0"));

    int iteration = 0;
    long previousRemainingRecords = 0L;
    for (;;) {
      // set up constants for current iteration
      Path dataInput = new Path(basedir, "temp" + iteration);
      Path dataOutput = new Path(basedir, "temp" + (iteration + 1));
      Path simInput = new Path(basedir, "temp_sim" + iteration);
      Path simOutput = new Path(basedir, "temp_sim" + (iteration + 1));
      Path countOutput = new Path(basedir, "temp_count" + (iteration + 1));

      // phase 2: do self-join on input file and compute similarity matrix
      // inputs:  self-join on files from temp_${iteration}
      // reference: similarity matrix file from temp_sim_${iteration},
      //            null if iteration=0.
      // outputs: similarity matrix file into temp_sim_${iteration+1}
      computeSimilarity(conf, dataInput, simInput, simOutput, iteration);

      // phase 3: find most similar pair, add pair, remove components 
      // input: files from temp_${iteration}
      // reference: files from temp_sim_${iteration} to create matrix
      // output: files into temp_${iteration+1}
      clusterDocs(conf, dataInput, dataOutput, simOutput, iteration);

      // check for termination criteria: either our pre-set maximum 
      // clusters for the document set has been reached, or clustering
      // has converged, so any cluster that will be created is "too large".
      // This is checked for in the DocClusteringReducer and it will 
      // not merge the rows in that case.
      long numRemainingRecords = 
        countRemainingRecords(conf, dataOutput, countOutput);
      if (numRemainingRecords <= MAX_CLUSTERS ||
          numRemainingRecords == previousRemainingRecords) {
        break;
      }
      previousRemainingRecords = numRemainingRecords;
      iteration++;
    }
    System.out.println("Output in " + new Path(basedir, "temp" + iteration));
  }
}

Phase 1: TF Normalization

The first stage is to normalize the term frequencies across the document. This is done by simply dividing each element of the document vector by the sum of all the elements. The mapper parses the comma-separated list of term frequencies into a SparseVector, sums over all the elements, divides each element by the sum, then converts it back to the comma-separated list.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Normalizes each document vector by dividing each element by the
 * sum of all the elements. Passes to the Reducer.
 */
public class TfNormalizerMapper 
    extends Mapper<LongWritable,Text,Text,Text> {

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String[] kvp = StringUtils.split(value.toString(), "\t");
    String[] frequencies = StringUtils.split(kvp[1], ",");
    SparseRealVector tf = new OpenMapRealVector(frequencies.length);
    for (int i = 0; i < frequencies.length; i++) {
      tf.setEntry(i, Double.valueOf(frequencies[i]));
    }
    double sum = tf.getL1Norm();
    SparseRealVector normalizedTfs = 
      new OpenMapRealVector(tf.mapDivide(sum));
    StringBuilder nbuf = new StringBuilder();
    int len = normalizedTfs.getDimension();
    for (int i = 0; i < len; i++) {
      if (i > 0) {
        nbuf.append(",");
      }
      nbuf.append(String.valueOf(normalizedTfs.getEntry(i)));
    }
    context.write(new Text(kvp[0]), new Text(nbuf.toString()));
  }
}

The reducer in this stage is a simple identity reducer. The end product of this transformation is the normalized TFs for each document.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * This is a simple identity reducer. It does no reduction, the record
 * is already created in the mapper, it simply picks the first (and only)
 * mapped record in the Iterable and writes it.
 */
public class TfNormalizerReducer extends Reducer<Text,Text,Text,Text> {

  @Override
  public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
    Text value = values.iterator().next();
    context.write(key, value);
  }
}

Phase 2: Self-Join and Similarity Calculation

The mapper in this phase does a self join of each record in the input with all the other records, and passes this on to the reducer. The self-join is done by passing in the data file(s) from the previous phase as the input data and also as a configuration parameter. For each record in the input data, the file is opened, read, joined with the input data and closed. This does result in a lot of disk IO - perhaps a better way performance-wise would be to read the file(s) from the configuration parameter into an in-memory list, since I only have about 170 documents to consider. But I wanted to figure out a way to work with arbitarily large input files, so this seemed to be a more general way to do this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SelfJoinMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import net.sf.jtmt.clustering.hadoop.agglomerative.HierarchicalAgglomerativeClusterer.Counters;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Does a map-side replicated join to produce a self-join of the 
 * input data files of normalized TFs. I could probably have built
 * a mapper which uses a in-memory structure for one of the sides 
 * of the join given the size of the data, but I am trying to do a 
 * POC, so the "real" data is likely to be larger. So the join 
 * repeatedly opens and closes one copy of the file that is joined 
 * with the main input file. The output is the self-joined file.
 */
public class SelfJoinMapper 
    extends Mapper<LongWritable,Text,Text,Text> {

  private String inputDir;
  private FileSystem fs;
  private int numRecords = 0;

  @Override
  public void setup(Context context) 
  throws IOException, InterruptedException {
    inputDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY);
    if (inputDir == null) {
      System.err.println("Cant get value for key:" + 
        HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY + ", abort");
      System.exit(-1);
    }
    fs = FileSystem.get(context.getConfiguration());
  }

  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    numRecords++;
    String[] lhsPair = StringUtils.split(value.toString(), "\t");
    FileStatus[] fstatuses = fs.listStatus(new Path(inputDir));
    for (FileStatus fstatus : fstatuses) {
      Path path = fstatus.getPath();
      if (! path.getName().startsWith("part-r")) {
        continue;
      }
      FSDataInputStream fis = fs.open(path);
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
      String line = null;
      while ((line = reader.readLine()) != null) {
        String[] rhsPair = StringUtils.split(line, "\t");
        Text newKey = new Text(
            StringUtils.join(new String[] {lhsPair[0], rhsPair[0]}, "/"));
        Text newValue = new Text(
            StringUtils.join(new String[] {lhsPair[1], rhsPair[1]}, "/"));
        context.write(newKey, newValue);
      }
      reader.close();
      fis.close();
    }
    // report back to the framework how many records are remaining
    // for debugging purposes.
    context.getCounter(Counters.REMAINING_RECORDS).increment(1L);
  }

  @Override
  public void cleanup(Context context) 
      throws IOException, InterruptedException {
  }
}

On the reducer side, the joined records are converted into term vectors and the cosine similarity calculated, and the output is the similarity matrix for the document-set, ie, a key-value pair containing the keys of the original documents and the similarity value. The similarity matrix from a previous run, if it exists, is fed into the reducer, so it does not have to recalculate the values that have already been calculated.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SimilarityCalculatorReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Takes as input the self-join of the normalized TF data, and the 
 * similarity matrix data from the previous run (if it exists). Looks
 * up the similarity between a pair of keys (if it exists) and writes
 * it out to the output. If it does not exist, then it computes it.
 * The first time (when there is no previous run), the number of 
 * similarity computations is O(n**2). In later runs, it is O(n).
 */
public class SimilarityCalculatorReducer 
    extends Reducer<Text,Text,Text,DoubleWritable> {

  private String inputSimilarityMapDir;
  private FileSystem fs;
  private Map<String,Double> similarityMatrix = 
    new HashMap<String,Double>();

  @Override
  public void setup(Context context) 
      throws IOException, InterruptedException {
    inputSimilarityMapDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
    if (inputSimilarityMapDir == null) {
      System.err.println("Warning: no input similarity map dir, ignoring");
    } else {
      fs = FileSystem.get(context.getConfiguration());
      FileStatus[] fstatuses = fs.listStatus(
        new Path(inputSimilarityMapDir));
      for (FileStatus fstatus : fstatuses) {
        Path path = fstatus.getPath();
        if (! path.getName().startsWith("part-r")) {
          continue;
        }
        FSDataInputStream fis = fs.open(path);
        BufferedReader reader = new BufferedReader(
          new InputStreamReader(fis));
        String line = null;
        while ((line = reader.readLine()) != null) {
          String[] kvpairs = StringUtils.split(line, "\t");
          List<String> docIds = Arrays.asList(
            StringUtils.split(kvpairs[0], "/"));
          Collections.sort(docIds);
          String mapKey = StringUtils.join(docIds.iterator(), "/");
          if (! similarityMatrix.containsKey(mapKey)) {
            similarityMatrix.put(mapKey, Double.valueOf(kvpairs[1]));
          }
        }
        reader.close();
        fis.close();
      }
    }
  }

  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
  throws IOException, InterruptedException {
    Text value = values.iterator().next();
    List<String> docIds = Arrays.asList(
      StringUtils.split(key.toString(), "/"));
    Collections.sort(docIds);
    String mapKey = StringUtils.join(docIds.iterator(), "/");
    if (! similarityMatrix.containsKey(mapKey)) {
      // compute the cosine similarity between the two documents
      String[] termFreqs = StringUtils.split(value.toString(), "/");
      SparseRealVector doc1 = buildDocVector(termFreqs[0]);
      SparseRealVector doc2 = buildDocVector(termFreqs[1]);
      double cosim = 
        doc1.dotProduct(doc2) / (doc1.getNorm() * doc2.getNorm());
      similarityMatrix.put(mapKey, cosim);
    }
    context.write(new Text(mapKey), 
      new DoubleWritable(similarityMatrix.get(mapKey)));
  }

  private SparseRealVector buildDocVector(String flist) {
    String[] freqs = StringUtils.split(flist, ",");
    SparseRealVector doc = new OpenMapRealVector(freqs.length);
    for (int i = 0; i < freqs.length; i++) {
      doc.setEntry(i, Double.valueOf(freqs[i]));
    }
    return doc;
  }
}

Phase 3: Clustering

The Mapper part of this job finds the document pair that has the maximum similarity from the similarity map generated in the previous run. This pair corresponds to the document (or document cluster) pair that will be merged in this iteration. The input to the Mapper is the normalized TF data generated in Phase 1 (or the output of the previous phase 3 job in the previous iteration). For each of these two documents, the key is replaced by the composite key containing both document ids in the pair and passed to the reducer. All other rows are passed through as is.

The mapper also checks to see if the maximum similarity found is greater than the similarity threshold, otherwise, it will not do any special handling. As a consequence, the number of documents in the clustered document set will be unchanged from the previous run. This fact is used in our termination criteria.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Uses the similarity map and the normalized TF data from the previous
 * run, and computes the pair with the maximum similarity. For the docs
 * that are part of this pair, it will change the key to be the combined
 * key for the cluster to be formed and pass it to the reducer. This will
 * only happen for 2 documents in each iteration. The other docs are passed
 * through unchanged. 
 */
public class DocClusteringMapper extends Mapper<LongWritable,Text,Text,Text> {

  private double similarityThreshold = 0.0D;
  private double maxSim = 0.0D;
  private String[] maxSimKeys = null;
  
  public void setup(Context context) 
      throws IOException, InterruptedException {
    similarityThreshold = context.getConfiguration().getFloat(
      HierarchicalAgglomerativeClusterer.SIMILARITY_THRESHOLD_KEY, 0.0F); 
    String simDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
    if (simDir == null) {
      System.err.println("Cant get value for key: " + 
        HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY + 
        ", abort");
      System.exit(-1);
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    FileStatus[] fstatuses = fs.listStatus(new Path(simDir));
    for (FileStatus fstatus : fstatuses) {
      Path path = fstatus.getPath();
      if (! path.getName().startsWith("part-r")) {
        continue;
      }
      FSDataInputStream fis = fs.open(path);
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
      String line = null;
      while ((line = reader.readLine()) != null) {
        String[] kvpair = StringUtils.split(line, "\t");
        String[] docIdPair = StringUtils.split(kvpair[0], "/");
        if (docIdPair[0].equals(docIdPair[1])) {
          // same doc, expected max, so skip
          continue;
        }
        if (Double.valueOf(kvpair[1]) > maxSim) {
          maxSim = Double.valueOf(kvpair[1]);
          maxSimKeys = docIdPair;
        }
      }
      reader.close();
      fis.close();
    }
  }
  
  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    String[] kvpair = StringUtils.split(value.toString(), "\t");
    if (maxSim > similarityThreshold &&
       (kvpair[0].equals(maxSimKeys[0]) || 
        kvpair[0].equals(maxSimKeys[1]))) {
      // if either of the keys in maxSimKeys match the key in the 
      // record, then replace the key with the combo-key (this key
      // represents the "cluster")
      String newKey = StringUtils.join(maxSimKeys, "+");
      context.write(new Text(newKey), new Text(kvpair[1]));
    } else {
      // pass record through unchanged
      context.write(new Text(kvpair[0]), new Text(kvpair[1]));
    }
  }
}

On the reducer side, the two documents that are supposed to be merged are going to have the same composite key, so when this is encountered, the reduction is to average the coordinates of the two documents into one. All other documents are passed through unchanged.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Source: ./src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reduces the normalized TF data produced by the mapper. The mapper
 * produced 2 documents with the same key, which are candidates for 
 * the new cluster. The reducer will compute the new cluster coordinates
 * using the coordinates of its components. The other documents are
 * written out unchanged.
 */
public class DocClusteringReducer extends Reducer<Text,Text,Text,Text> {

  public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
    double numRecords = 0D;
    SparseRealVector doc = null;
    for (Text value : values) {
      String[] tfs = StringUtils.split(value.toString(), ",");
      if (doc == null) {
        doc = new OpenMapRealVector(tfs.length);
      }
      for (int i = 0; i < tfs.length; i++) {
        doc.setEntry(i, doc.getEntry(i) + Double.valueOf(tfs[i]));
      }
      numRecords++;
    }
    SparseRealVector cluster = 
      new OpenMapRealVector(doc.mapDivide(numRecords));
    int numTerms = cluster.getDimension();
    StringBuilder buf = new StringBuilder();
    for (int i = 0; i < numTerms; i++) {
      if (i > 0) {
        buf.append(",");
      }
      buf.append(String.valueOf(cluster.getEntry(i)));
    }
    // replace the "+" in the key with "," since its done clustering
    String newKey = StringUtils.replace(key.toString(), "+", ",");
    context.write(new Text(newKey), new Text(buf.toString()));
  }
}

Phase 4: Checking for termination

The last phase just checks for the number of clustered documents in the document set. If the number of clustered documents reach a certain (our MAX_CLUSTERS) threshold, or is unchanged from the previous run (because it hit our SIMILARITY_THRESHOLD), then the loop is terminated and the job ends. The mapper here is a simple one, it writes a constant key and a 1 for each line it sees.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper to count number of lines in a file. For each line sent to the
 * mapper, it sends a constant key and a 1 to the reducer to add.
 */
public class RecordCountMapper 
    extends Mapper<LongWritable,Text,Text,LongWritable> {

  private static final Text COUNT_KEY = new Text("count");
  private static final LongWritable ONE = new LongWritable(1L);
  
  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    context.write(COUNT_KEY, ONE);
  }
}

The reducer just counts it and returns the count. The code in the driver then reads the file and retrieves the count and returns it to the main method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Sums up the number of line counts sent by the mapper and returns a 
 * single line of output.
 */
public class RecordCountReducer 
    extends Reducer<Text,LongWritable,Text,LongWritable> {

  @Override
  public void reduce(Text key, Iterable<LongWritable> values, 
      Context context) throws IOException, InterruptedException {
    long sum = 0;
    for (LongWritable value : values) {
      sum++;
    }
    context.write(key, new LongWritable(sum));
  }
}

Running

I've run this (for 3-5 iterations) on both a standalone and pseudo-distributed modes, and they run fine in both scenarios, although they take a fair amount of time. Here is the script I used to run it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/bin/bash
# Source: src/main/scripts/hadoop/run_hac.sh
# Script to call Hierarchical Agglomerative Clusterer.

## CONFIG ##
M2_REPO=/home/sujit/.m2/repository
HADOOP_HOME=/opt/hadoop-0.20.0
PROJECT_BASEDIR=/home/sujit/src/jtmt
MODE=p # mode can be (l)ocal or (p)seudo-distributed
if [ $MODE == "l" ]; then
  PROTOCOL_PREFIX=$PROJECT_BASEDIR/src/test/resources
else
  PROTOCOL_PREFIX=hdfs://localhost:54310
fi
INPUT_DIR=/hac/input
TEMP_DIRS=/hac/temp*
## CONFIG ##

# for local mode
if [ $MODE == "l" ]; then
  export HADOOP_CLASSPATH=$CLASSPATH:\
$M2_REPO/commons-lang/commons-lang/2.1/commons-lang-2.1.jar:\
$M2_REPO/commons-math/commons-math/2.0/commons-math-2.0.jar
fi

cd $HADOOP_HOME
if [ $MODE == "l" ]; then
  rm -rf $PROTOCOL_PREFIX$TEMP_DIRS
  # no special packaging required for local mode
  bin/hadoop jar $PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar \
    net...HierarchicalAgglomerativeClusterer \
    $PROTOCOL_PREFIX$INPUT_DIR
else
  bin/hadoop fs -rmr $TEMP_DIRS
  bin/hadoop jar $PROJECT_BASEDIR/target/ha-clusterer.jar \
    $PROTOCOL_PREFIX$INPUT_DIR
fi
cd -
unset HADOOP_CLASSPATH

I wanted to run this on Amazon EC2, but at the time of writing this post, there were no freely available Amazon EC2 machine images that included Hadoop 0.20. Arun Jacob describes here how he created his own Hadoop-0.20 AMI. Chad Metcalfe of Cloudera pointed out another way to get around this situation by choosing a stock AMI and installing Hadoop/CDH2 on it. Ideally, I would probably just wait for the Hadoop 0.20 AMI to become available and then use it. However, in the event that it does not anytime soon, the second approach seems (to me anyway) the next best.

14 comments (moderated to prevent spam):

Alex Kamil said...

Excellent post, thanks a lot,
this is one of the most comprehensive examples of hadoop implementation I've seen

Sujit Pal said...

Thanks, Alex, and you are welcome.

Anonymous said...

hi sujit

how can i do incremental clustering using your code?


can i handle millions document using your code?

Sujit Pal said...

Hi,

Once you already have your most of your clusters (as would be the case for incremental indexing), you would have a good idea of how wide your cluster should be. At that point, I think it would be far simpler to just use that information to figure out if your new document should join an existing cluster or start a new one.

For N >>> k, where N is the number of docs and k is the number of clusters, you probably won't want to do it this way either, since there will be lots of stages you will have to run.

alpesh dhamelia said...

Hi, Sujit

i done as per your blog for Hirerarchical Clustering but it di not work for me.

i submited 5 txt files artical form wikipedia(mahatma gandhi.txt,ratan tata.txt , hitler.txt,microsoft.txt,apple.txt)

but when i run clustering algo it gave me output :

gandhi.txt 0.0015904572564612327,0.002584493041749503,5.964214711729623E

microsoft.txt
some figure as in gandhi.txt

so my question is that where are the clusters?

i was supposed to expect out like this

cluster1:
gandh.txt

cluster2:
apple
microsoft
cluster3:
hitler something like this.

so anything i am missing?

please help me out.........

Sujit Pal said...

Hi Alpesh, I am using Cosine similarity between the document vectors to compute the similarity between them. Perhaps they are very similar, ie below the threshold and hence clustering stops? I would suggest checking to see how many iterations it has run through, etc, in order to debug the issue.

anjana pai said...

hey i need a program to implement heirarachical agglomeritive algorithm to just cluster a small amount of sample datasets....can u plz help me out with it....also i want it to be as simple as possible.....

Sujit Pal said...

Hi Anjana, the HAC algorithm is very simple. You can find a simple explanation on Wikipedia - in fact, its the top hit on Google for this term.

abeppu said...

Hi,

I know this post is rather old, but I just saw it linked to from hacker news this week. I have to say I'm confused by several points of your approach. Perhaps you could shed some light on the choices you made in your implementation?

Why, for example, did you write your own identity reducer rather than using the one that comes with hadoop (http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/lib/IdentityReducer.html)?

Also, I'm confused about why your clustering stage only merges two documents/clusters per iteration. To my mind, it would be vastly more efficient to for each iteration, join each document or document cluster to its nearest neighbor -- because your similarity measure is symmetric, this should still get you the right results.

Further, even if there is some reason that you only want to merge together two documents (or clusters) per iteration, I'm confused why your mapper outputs all of the data, including stuff that doesn't get merged. Given that hadoop jobs are often IO bound, it seems like it would be much faster to only touch the documents/clusters that you're merging, and then just merge in the output from your reducer back to your mostly untouched data (and then delete out the corresponding pre-merge rows, which you should be able to do easily).

But mostly, why is every mapper an n^2 operation, looping over a copy that you're reading line by line -- that seems like it's missing the whole point of hadoop. Really, this seems like it's only feasible for small datasets, which can be crammed onto one machine (and processed by one machine) anyways -- whereas the whole advantage of Hadoop is the ability to distribute both processing and data. And the fact that for 170 documents this 'takes a fair amount of time' to me sounds like a significant red flag. I suggest you invest the effort to try running this on a sizable EC2 cluster, and with a much larger dataset (e.g. the wikipedia dataset that Amazon AWS makes available), and report your results. I think you may find that some significant changes are needed to make it work, and your readers could benefit from reading and updated version.

Sujit Pal said...

Hi abeppu, thanks for your detailed and thoughtful comment. I will try to answer to the best of my ability.

1) I wrote my own identity reducer because one was not available with the new API when Hadoop 0.20 just came out. Haven't looked at Hadoop lately, but I think its there now.
2) This is from the definition of HAC. At each stage a single item is merged into a cluster. As you point out, ignoring this "requirement" will probably not impact the results and will lead to vastly improved performance.
3) That is a good point, will have to think about how to implement that.
4) Its an n^2 operation because thats the only way I knew how to do it with the Distributed cache - perhaps I should use some sort of random access mechanism. Again, something to think about.
5) Yes, I /know/ the current implementation sucks. I am learning Hadoop, and frankly this was the best I could come up with at the time - the idea was to put it out there and invite comments such as yours.

I am not working on Hadoop at the moment, but I will revisit this when I am. Thanks again for your coment.

vijay dinanath chauhan said...
This comment has been removed by a blog administrator.
Sujit Pal said...

NOTE: the post above has been deleted because it contained incoherent expletive-ridden ramblings in Hinglish (Hindi using English alphabet). I deleted it because (a) it was offensive, (b) because it contained no useful information and (c) because it would be incomprehensible to most English speaking readers.

@Vijay: From your post, it appears that you could not make the code work. I can assure you that the code /does/ work, though not very efficiently (O(n**3)). If you had commented politely /and/ in a constructive way, I would have probably tried to help.

Another thing - I have said this before, and I will say it again. I write this stuff for myself, and the fact that you benefit (or not) is just a side effect to me. If you have problems with the code, either point them out and/or ask for help, or come up with a way that works for you.

I am replying to you in the hope that you are currently clueless about basic netiquette, and this would serve as a quick primer. In the future, I will delete your comment without explanation if they have the features listed in the first paragraph.

vitthal said...

HI HOW I CAN RUN THIS CODE. I AM NEW IN HADOOP. PLZ HELP

Sujit Pal said...

Hi vitthal, while the code works, it is quite slow. I would advise using something more efficient. Look around the net, you should find HAC being described with code in quite a few textbooks - I found a reference using the google search "hadoop hierarchical clustering" - its the book Hadoop Mapreduce Cookbook from PackT.