Sunday, September 09, 2012

Learning Mahout : Classification

The final part covered in the MIA book is Classification. The popular algorithms available are Stochastic Gradient Descent (SGD), Naive Bayes and Complementary Naive Bayes, Random Forests and Online Passive Aggressive. There are other algorithms in the pipeline, as seen from the Classification section of the Mahout wiki page.

The MIA book has generic classification information and advice that will be useful for any algorithm, but it specifically covers SGD, Bayes and Naive Bayes (the last two via Mahout scripts). Of these SGD and Random Forest are good for classification problems involving continuous variables and small to medium datasets, and the Naive Bayes family is good for problems involving text like variables and medium to large datasets.

In general, a solution to a classification problem involves choosing the appropriate features for classification, choosing the algorithm, generating the feature vectors (vectorization), training the model and evaluating the results in a loop. You continue to tweak stuff in each of these steps until you get the results with the desired accuracy.

If training data is provided (or if it can be reduced to) tabular form, then we can use Mahout subcommands to generate a logistic regression (SGD) model and test it, as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
hduser@cyclone:mahout$ bin/mahout trainlogistic \
  --input /tmp/donut.csv \
  --output /tmp/model \
  --target color \               # target variable
  --categories 2 \               # number of categories
  --predictors x y \             # predictor variables
  --types numeric \              # predictor variable types
  --features 20 \                # size of internal feature vector
  --passes 100 \                 # number of passes over input
  --rate 50                      # initial learning rate
hduser@cyclone:mahout$ bin/mahout runlogistic \
  --input /tmp/test-donut.csv \
  --model /tmp/model \
  --auc \                 # report area under curve
  --confusion             # report confusion matrix
AUC = 0.57
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.4, -0.3], [-1.2, -0.7]]
...

The AUC (area under the curve) indicates the number of correct classifications, so 0 indicates a classifier thats always wrong and 1 indicates one thats always correct. The confusion matrix indicates the expected vs actual classification numbers (so for a good classifier, we should expect the numbers along the diagonal to predominate). For SGD, AUC and Log Likelihood are good measures. For NB and CNB, the percent correct and confusion matrix are good measures.

Most classification problems involve a mix of continuous, categorical, word like and text-like features. The input to a (Mahout) classification algorithm is in the form of vectors. Vectorizing approaches can be one cell/word, bag of words, and feature hashing (similar in concept to Bloom filters).

The data set used for most of the examples in the MIA book is the 20 Newsgroups DataSet. The data is a bunch of newsgroup postings, so apart from the text body, candidates for features could be their header metadata.

So the first step is to see what headers are available and how many there are. The following Unix command will list the headers and their counts. Based on the analysis, the decision is made to use the Subject, From, Keywords and Summary as additional metadata to assist in classification.

1
2
3
4
sujit@cyclone:20news-bydate-train$ export LC_ALL='C'; \
  for file in */*; do \
    sed -E -e '/^$/,$d' -e 's/:.*//' -e '/^[[:space:]]/d' $file; \
  done | sort | uniq -c | sort -nr

The following code uses the AdaptiveLogisticRegression algorithm (which runs multiple SGD algorithms and automatically chooses the best one) to classify the 20 Newsgroups training set, then test the algorithm with the 20 Newsgroups test set. The code demonstrates the building of feature vectors for each document using multiple hashing encoders.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Source: src/main/scala/com/mycompany/mia/classify/SGD20NewsgroupsClassifier.scala
package com.mycompany.mia.classify

import java.io.{StringReader, PrintWriter, FileInputStream, File}
import java.util.{HashMap, Collections, ArrayList}

import scala.collection.JavaConversions.{collectionAsScalaIterable, asScalaBuffer}
import scala.io.Source

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.Version
import org.apache.mahout.classifier.sgd.OnlineLogisticRegression
import org.apache.mahout.classifier.sgd.{ModelSerializer, L1, AdaptiveLogisticRegression}
import org.apache.mahout.common.RandomUtils
import org.apache.mahout.math.{Vector, RandomAccessSparseVector}
import org.apache.mahout.vectorizer.encoders.{TextValueEncoder, Dictionary, ConstantValueEncoder}

import com.google.common.collect.ConcurrentHashMultiset

object SGD20NewsgroupsClassifier extends App {

  val features = 10000
  val analyzer = new StandardAnalyzer(Version.LUCENE_32)
  val encoder = new TextValueEncoder("body")
  encoder.setProbes(2)
  val lines = new ConstantValueEncoder("line")
  val loglines = new ConstantValueEncoder("log(line)")
  val bias = new ConstantValueEncoder("intercept")
  val rand = RandomUtils.getRandom()

  // Usage: either
  // SGD20NewsgroupsClassifier train input_dir model_file dict_file, or
  // SGD20NewsgroupsClassifier test model_file dict_file test_dir
  args(0) match {
    case "train" => train(args(1), args(2), args(3))
    case "test" => test(args(1), args(2), args(3))
  }
  
  def train(trainDir : String, 
      modelFile : String, 
      dictFile : String) : Unit = {
    val newsgroups = new Dictionary()
    val learningAlgorithm = new AdaptiveLogisticRegression(
      20, features, new L1())
    learningAlgorithm.setInterval(800)
    learningAlgorithm.setAveragingWindow(500)
    // prepare data
    val files = new ArrayList[File]()
    val dirs = new File(trainDir).listFiles()
    for (dir <- dirs) {
      if (dir.isDirectory()) {
        newsgroups.intern(dir.getName())
        for (file <- dir.listFiles()) {
          files.add(file)
        }
      }
    }
    Collections.shuffle(files)
    println(files.size() + " training files in " + dirs.length + " classes")
  
    var k = 0
    var step = 0D
    var bumps = Array(1, 2, 5)

    for (file <- files) {
      val ng = file.getParentFile().getName()
      val actualClass = newsgroups.intern(ng)
      val vector = encodeFeatureVector(file)
      learningAlgorithm.train(actualClass, vector)
    }
    learningAlgorithm.close()
    // evaluate model
    val learner = learningAlgorithm.getBest().getPayload().getLearner()
    println("AUC=" + learner.auc() + ", %-correct=" + learner.percentCorrect())
    ModelSerializer.writeBinary(modelFile, learner.getModels().get(0))
    val serializedDict = new PrintWriter(dictFile)
    for (newsgroup <- newsgroups.values()) {
      serializedDict.println(newsgroup)
    }
    serializedDict.flush()
    serializedDict.close()
  }
  
  def encodeFeatureVector(file : File) : Vector = {
    
    val vector = new RandomAccessSparseVector(features)
    val words : ConcurrentHashMultiset[String] = 
      ConcurrentHashMultiset.create()
    var numlines = 0
    var startBody = false
    var prevLine = ""
    for (line <- Source.fromFile(file).getLines()) {
      if (line.startsWith("From:") ||
          line.startsWith("Subject:") ||
          line.startsWith("Keywords:") ||
          line.startsWith("Summary:")) {
        countWords(line.replaceAll(".*:", ""), words)
      }
      if (! startBody &&
          line.trim().length() == 0 &&
          prevLine.trim().length() == 0) {
        startBody = true
      }
      if (startBody) {
        countWords(line, words)
      }
      numlines += 1
      prevLine = line
    }
    bias.addToVector(null, 1, vector)
    lines.addToVector(null, numlines / 30, vector)
    loglines.addToVector(null, Math.log(numlines + 1), vector)
    for (word <- words) {
      encoder.addToVector(word, Math.log(1 + words.count(word)), vector)
    }
    vector
  }
  
  def countWords(line : String,
      words : ConcurrentHashMultiset[String]) : Unit = {
    val words = new ArrayList[String]()
    val tokenStream = analyzer.tokenStream("text", new StringReader(line))
    tokenStream.addAttribute(classOf[CharTermAttribute])
    while (tokenStream.incrementToken()) {
      val attr = tokenStream.getAttribute(classOf[CharTermAttribute])
      words.add(new String(attr.buffer(), 0, attr.length()))
    }
  }
  
  def test(modelFile : String, 
      dictFile : String, 
      testDir : String) : Unit = {
    
    val model = ModelSerializer.readBinary(
      new FileInputStream(modelFile), 
      classOf[OnlineLogisticRegression])
    val newsgroups = getNewsgroups(dictFile)
    
    val dirs = new File(testDir).listFiles()
    var ncorrect = 0
    var ntotal = 0
    for (dir <- dirs) {
      if (dir.isDirectory()) {
        val expectedLabel = dir.getName()
        for (file <- dir.listFiles()) {
          val vector = encodeFeatureVector(file)
          val results = model.classify(vector)
          val actualLabel = newsgroups.get(results.maxValueIndex())
          println("file: " + file.getName() + 
            ", expected: " + expectedLabel + 
            ", actual: " + actualLabel)
          if (actualLabel.equals(expectedLabel)) {
            ncorrect += 1
          }
          ntotal += 1
        }
      }
    }
    println("Correct: " + ncorrect + "/" + ntotal)
  }
  
  def getNewsgroups(dictFile : String) : HashMap[Integer,String] = {
    val newsgroups = new HashMap[Integer,String]()
    var lno = 0
    for (line <- Source.fromFile(dictFile).getLines()) {
      newsgroups.put(lno, line)
      lno += 1
    }
    newsgroups
  }
}

To train and test it, we use the following commands. The accuracies are nothing to write home about (AUC=0.5), but at this time I am more concerned with the mechanics of getting a classifier working than the results.

1
2
3
4
5
6
7
8
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.SGD20NewsgroupsClassifier train \
  /path/to/20news-bydate-train \
  /path/to/model.file /path/to/dict.file'
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.SGD20NewsgroupsClassifier test \
  /path/to/model.file /path/to/dict.file \
  /path/to/20news-bydate-test'

As you can see, the above is not run on Hadoop. This is because SGD is a sequential algorithm, so there is no point to parallelizing it. The Naive Bayes family, on the other hand, does benefit from parallelism, so it is run on Hadoop. The book covers running via scripts, so thats what I did, figuring I will go back and check out the code later (the class names are available in "driver.classes.props" in the Mahout source) if and when I need to build integrated solutions.

The book specifies the prepare20newsgroups subcommand, but that is deprecated in the current Mahout distribution, and they have a shell script to run both the Naive Bayes and SGD versions. I believe the part where the training files are converted to a 1 file per line sequence file is incorrect, so I wrote my own code to do this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Source: src/main/scala/com/mycompany/mia/classify/NaiveBayes20NewsgroupDataPreparer.scala
package com.mycompany.mia.classify

import java.io.File

import scala.io.Source

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{Text, SequenceFile}

object NaiveBayes20NewsgroupDataPreparer extends App {

  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  val path = new Path(args(1))
  val writer = new SequenceFile.Writer(fs, conf, path, 
    classOf[Text], classOf[Text])
  val dirs = new File(args(0)).listFiles()
  var n = 0
  for (dir <- dirs) {
    val label = dir.getName()
    for (file <- dir.listFiles()) {
      val text = Source.fromFile(file).
        getLines().
        foldLeft("") (_ + " " + _)
      // extra slash added to key to get around AAOOB thrown
      // by BayesUtils.writeLabelIndex
      writer.append(new Text("/" + label), new Text(text))
      n += 1
    }
    println(label + ": " + n + " files loaded")
  }
  writer.close()
  
  // self-test to see that everything loaded okay...
  val reader = new SequenceFile.Reader(fs, path, conf)
  val key = new Text()
  val value = new Text()
  var rec = 0
  while (reader.next(key, value)) {
    if (rec < 10) {
      println(key.toString() + " => " + value.toString())
    }
    rec += 1
  }
  println("...")
  println("#=records written: " + rec)
  reader.close()
}

The following sequence of scripts prepares the data from the input files, trains a Naive Bayes classifier and runs it against the test set. Running the complementary Naive Bayes just involves passing an extra parameter to the trainnb and testnb subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.NaiveBayes20NewsgroupDataPreparer \
  /path/to/20news-bydate-train /tmp/20news-seq'
hduser@cyclone:mahout$ hadoop fs -put /tmp/20news-seq .
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i 20news-seq -o 20news-vectors -lnorm -nv -wt tfidf
hduser@cyclone:mahout$ bin/mahout split \
  -i 20news-vectors/tfidf-vectors \
  --trainingOutput 20news-train-vectors \
  --testOutput 20news-test-vectors \
  --randomSelectionPct 20 \
  --overwrite --sequenceFiles -xm sequential
hduser@cyclone:mahout$ bin/mahout trainnb \
  -i 20news-train-vectors -el -o model -li labelindex -ow 
hduser@cyclone:mahout$ bin/mahout testnb \
  -i 20news-train-vectors \
  -o 20news-testing \
  -m model -l labelindex -ow 

To improve the output of the classifier, one should investigate target leaks (resulting to "too good" results), broken feature extraction (results in bad results), eliminating, adding and combining features (eg, brand, gender, brand+gender, etc), normalizing feature dimensions, trying out different algorithms, etc.

Mahout is optimized for situations where you get lots of training examples (internet scale, typically generated by user clicks, etc), rather than the much smaller training sets created by domain experts in more traditional settings. More than the other parts, the classification code base seems to be more fluid and under heavier development, so it may make sense to look in the Mahout source code or the Mahout Newsgroup for answers rather than depending on the book.

Saturday, September 01, 2012

Learning Mahout : Clustering

The next section in the MIA book is Clustering. As with Recommenders, Mahout provides both in-memory and map-reduce versions of various clustering algorithms. However, unlike Recommenders, there are quite a few toolkits (like Weka or Mallet for example) which are more comprehensive than Mahout for small or medium sized datasets, so I decided to concentrate on the M/R implementations.

The full list of clustering algorithms available in Mahout at the moment can be found on its Wiki Page under the Clustering section. The ones covered in the book are K-Means, Canopy, Fuzzy K-Means, LDA and Dirichlet. All these algorithms expect data in the form of vectors, so the first step is to convert the input data into this format, a process known as vectorization. Essentially, clustering is the process of finding nearby points in n-dimensional space, where each vector represents a point in this space, and each element of a vector represents a dimension in this space.

It is important to choose the right vector format for the clustering algorithm. For example, one should use the SequentialAccessSparseVector for KMeans, sinc there is lot of sequential access in the algorithm. Other possibilities are the DenseVector and the RandomAccessSparseVector formats. The input to a clustering algorithm is a SequenceFile containing key-value pairs of {IntWritable, VectorWritable} objects. Since the implementations are given, Mahout users would spend most of their time vectorizing the input (and thinking about what feature vectors to use, of course).

Once vectorized, one can invoke the appropriate algorithm either by calling the appropriate bin/mahout subcommand from the command line, or through a program by calling the appropriate Driver's run method. All the algorithms require the initial centroids to be provided, and the algorithm iteratively perturbes the centroids until they converge. One can either guess randomly or use the Canopy clusterer to generate the initial centroids.

Finally, the output of the clustering algorithm can be read using the Mahout cluster dumper subcommand. To check the quality, take a look at the top terms in each cluster to see how "believable" they are. Another way to measure the quality of clusters is to measure the intercluster and intracluster distances. A lower spread of intercluster and intracluster distances generally imply "good" clusters. Here is code to calculate inter-cluster distance based on code from the MIA book.

There are various ways of improving clustering quality. The first, of course, is knowing your data well enough so you can choose good features to cluster on. For clustering text or real world (dirty) data, vector generation can be improved by removing noise and using a good weighting technique. Mahout allows you to specify your custom Lucene analyzers to its clustering subcommands for this. Cluster quality is also dependent on the measure used to calculate similarity between two feature vectors. Once again, Mahout supplies a large number of Distance Measure implementations (Chebyshev, Cosine, Mahalanobis, Manhattan, Minkowski, SquaredEuclidean, Euclidean, Tanimoto, Weighted Euclidean and Weighted Manhattan) and also allows you to specify your own if these don't suit your purposes. Within each dimension, points can be normalized to remove the effect of outliers - the normalization p-norm should match the p-norm used by the distance measure. Finally, if the dimensions are not comparable, such as number of bedrooms and price in dollars for a house, then one should normalize across dimensions, a process known as weighting (this should be done during the vectorization process, which you control fully).

Case Study - Reuters

The dataset used for this is the Reuters-21578 collection. I basically followed along with the book, trying out commands and making them work against a pseudo-distributed Hadoop installation on my notebook.

Vectors can be created using the following Mahout subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create sequence file from directory of input files
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seqdirectory \
  -c UTF-8 \
  -i reuters \          # input directory
  -o reuters-seqfiles   # output directory
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create vectors from sequence file (no normalization)
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i reuters-seqfiles \ # input: sequence files directory
  -o reuters-vectors \  # output: vectors
  -ow
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # this creates 7 sub directories under output
hduser@cyclone:mahout$ # df-count, dictionary.file*, frequency.file-*,
hduser@cyclone:mahout$ # tf-vectors, and tfidf-vectors
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create vectors from sequence file (with normalization)
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i reuters-seqfiles \
  -o reuters-normalized-bigram -ow \
  -a org.apache.lucene.analysis.WhitespaceAnalyzer \ # analyzer
  -chunk 200 \  # chunk size (MB)
  -wt tfidf \   # weighting scheme
  -s 5 \        # minimum support
  -md 3 \       # minimum document frequency
  -x 90 \       # maximum document frequency percentage
  -ng 2 \       # ngram size
  -ml 50 \      # minimum log likelihood ratio
  -seq \        # create sequential access sparse vectors
  -n 2          # normalization - use 2-norm (aka Euclidean norm)
                # should be paired with similar distance measure
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # two additional directories created,
hduser@cyclone:mahout$ # tokenized-documents and wordcount.
hduser@cyclone:mahout$ #######################################################

To run the K-Means cluster, here is the sequence of mahout subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # invoke kmeans
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i reuters-vectors/tfidf-vectors \ # input
  -c reuters-initial-clusters \      # generate initial clusters
  -o reuters-kmeans-clusters         # output
  -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure \
  -cd 1.0 \ # convergence threshold
  -k 20 \   # number of clusters
  -x 20 \   # max iterations
  -cl       # run clustering after iterations
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # in the output directory, there is a clusteredPoints
hduser@cyclone:mahout$ # directory, and directories corresponding to each
hduser@cyclone:mahout$ # clustering iteration named clusters-*. The last
hduser@cyclone:mahout$ # cluster is named cluster-*-final.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # dump clusters to local filesystem
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \                            # format: {Integer => String}
  -d reuters-vectors/dictionary.file-* \        # dictionary: {id => word}
  -i reuters-kmeans-clusters/clusters-3-final \ # input
  -o clusters.txt \                             # output (local filesystem)
  -b 10 \                                       # format length
  -n 10                                         # number of top terms to print

The dm flag specifies the Distance measure. In general, it is better to use either Cosine or Tanimoto distance measures for clustering text rather than the SquaredEuclidean measure.

The value of k (number of clusters) is provided by the caller based on his knowledge of the data. You can eliminate the guesswork by using the Canopy clusterer with appropriate distance thresholds to indicate the size of clusters as shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # generate initial clusters with canopy
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout canopy \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-canopy-centroids \
  -dm org.apache.mahout.common.distance.EuclideanDistanceMeasure \
  -t1 1500 \ # points between 0 and t1 from centroid are included
  -t2 2000   # points between t1 and t2 from centroid are removed
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # run kmeans with canopy cluster centroids.
hduser@cyclone:mahout$ # in this case, since -c points to a populated path,
hduser@cyclone:mahout$ # the algo doesn't have to guess initial centroids.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-kmeans-clusters \
  -c reuters-canopy-centroids/clusters-0-final \
  -dm org.apache.mahout.common.distance.TanimotoDistanceMeasure \
  -cd 0.1 \     # convergence threshold
  -ow \         # overwrite
  -x 20 \       # max iterations
  -cl           # run clustering after iterations
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view the clusters generated
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \                     # format {Integer => String}
  -d reuters-vectors/dictionary.file-* \ # dictionary (id => word)
  -i reuters-kmeans-clusters/clusters-1-final \ # input directory
  -o clusters.txt \                      # output (local filesystem)
  -b 10 \                                # max size of word to print
  -n 10                                  # number of top terms

You can also write your own custom clusterer. I suppose this would be useful when you want to deploy an end-to-end solution but as you can see, there is not much difference from using a pipeline of commands as shown above. Here is an example of a custom clusterer that works against the Reuters dataset, using Canopy to compute the initial centroids, then using KMeans (or optionally Fuzzy KMeans) for clustering.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Source: :r src/main/scala/com/mycompany/mia/cluster/ReutersClusterer.scala
package com.mycompany.mia.cluster

import java.io.{StringReader, Reader}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{SequenceFile, IntWritable}
import org.apache.lucene.analysis.standard.{StandardTokenizer, StandardFilter, StandardAnalyzer}
import org.apache.lucene.analysis.tokenattributes.TermAttribute
import org.apache.lucene.analysis.{WhitespaceTokenizer, TokenStream, StopFilter, LowerCaseFilter, Analyzer}
import org.apache.lucene.util.Version
import org.apache.mahout.clustering.canopy.CanopyDriver
import org.apache.mahout.clustering.classify.WeightedVectorWritable
import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver
import org.apache.mahout.clustering.kmeans.KMeansDriver
import org.apache.mahout.clustering.Cluster
import org.apache.mahout.common.distance.{TanimotoDistanceMeasure, EuclideanDistanceMeasure}
import org.apache.mahout.common.HadoopUtil
import org.apache.mahout.vectorizer.tfidf.TFIDFConverter
import org.apache.mahout.vectorizer.{DocumentProcessor, DictionaryVectorizer}

object ReutersClusterer extends App {
  
  // parameters
  val minSupport = 2
  val minDf = 5
  val maxDfPercent = 95
  val maxNGramSize = 2
  val minLLRValue = 50
  val reduceTasks = 1
  val chunkSize = 200
  val norm = 2
  val sequentialAccessOutput = true
  
  val inputDir = args(0) // directory of doc sequence file(s)
  val outputDir = args(1) // directory where clusters will be written
  val algo = args(2) // "kmeans" or "fkmeans"
  
  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  HadoopUtil.delete(conf, new Path(outputDir))
  
  // converts input docs in sequence file format in input_dir 
  // into token array in output_dir/tokenized-documents
  val inputPath = new Path(inputDir)
  val tokenizedDocPath = new Path(outputDir, 
    DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER) 
  DocumentProcessor.tokenizeDocuments(inputPath, 
    classOf[ReutersAnalyzer], tokenizedDocPath, conf)
    
  // reads token array in output_dir/tokenized-documents and
  // writes term frequency vectors in output_dir (under tf-vectors)
  DictionaryVectorizer.createTermFrequencyVectors(
    tokenizedDocPath,
    new Path(outputDir),
    DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER,
    conf, minSupport, maxNGramSize, minLLRValue, 2, true, reduceTasks,
    chunkSize, sequentialAccessOutput, false)

  // converts term frequency vectors in output_dir/tf-vectors
  // to TF-IDF vectors in output_dir (under tfidf-vectors)
  val tfVectorPath = new Path(outputDir, 
    DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER)
  val outputPath = new Path(outputDir)
  val docFreqs = TFIDFConverter.calculateDF(
    tfVectorPath, outputPath, conf, chunkSize)
  TFIDFConverter.processTfIdf(tfVectorPath, outputPath,
    conf, docFreqs, minDf, maxDfPercent, norm, true, 
    sequentialAccessOutput, false, reduceTasks)
    
  // reads tfidf-vectors from output_dir/tfidf-vectors
  // and writes out Canopy centroids at output_dir/canopy-centroids
  val tfidfVectorPath = new Path(outputDir, "tfidf-vectors")
  val canopyCentroidPath = new Path(outputDir, "canopy-centroids")
  CanopyDriver.run(conf, tfidfVectorPath, canopyCentroidPath,
    new EuclideanDistanceMeasure(), 250, 120, false, 0.01, false)
    
  // reads tfidf-vectors from output_dir/tfidf-vectors and 
  // refers to directory path for initial clusters, and 
  // writes out clusters to output_dir/clusters
  val clusterPath = new Path(outputDir, "clusters")
  algo match {
    case "kmeans" => KMeansDriver.run(conf, tfidfVectorPath, 
      new Path(canopyCentroidPath, "clusters-0-final"),
      clusterPath, new TanimotoDistanceMeasure(), 
      0.01, 20, true, 0.01, false)
    case "fkmeans" => FuzzyKMeansDriver.run(conf, tfidfVectorPath,
      new Path(canopyCentroidPath, "clusters-0-final"),
      clusterPath, new TanimotoDistanceMeasure(), 
      0.01, 20, 2.0f, true, true, 0.0, false)
    case _ => throw new IllegalArgumentException(
      "algo can be either kmeans or fkmeans")
  }
  
  // read clusters and output
  val reader = new SequenceFile.Reader(fs, 
    new Path(clusterPath, Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), 
    conf)
  val key = new IntWritable()
  val value = new WeightedVectorWritable()
  while (reader.next(key, value)) {
    println(key.toString + " belongs to " + value.toString)
  }
  reader.close()
}

class ReutersAnalyzer extends Analyzer {
  
  val ALPHA_PATTERN = """[a-z]+""".r
  
  override def tokenStream(fieldName : String, reader : Reader) : 
   TokenStream = {
    // tokenize input string by standard tokenizer
    var result : TokenStream = 
      new StandardTokenizer(Version.LUCENE_CURRENT, reader)
    result = new StandardFilter(result)
    // lowercase all words
    result = new LowerCaseFilter(result)
    // remove stop words
    result = new StopFilter(true, result, 
      StandardAnalyzer.STOP_WORDS_SET)
    val termAttr = result.addAttribute(classOf[TermAttribute]).
      asInstanceOf[TermAttribute]
    val buf = new StringBuilder()
    while (result.incrementToken()) {
      // remove words < 3 chars long
      if (termAttr.termLength() >= 3) {
        val word = new String(
          termAttr.termBuffer(), 0, termAttr.termLength())
        // remove words with non-alpha chars in them
        if (ALPHA_PATTERN.pattern.matcher(word).matches) {
          buf.append(word).append(" ")
        }
      }
    }
    // return the remaining tokens
    new WhitespaceTokenizer(new StringReader(buf.toString))
  }
}

Just like K-Means, the other clustering algorithms can also be run either form a script or via code as shown above. Here are the commands for the other major clustering algorithms.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # run fuzzy kmeans with random initial centroids
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout fkmeans \
  -i reuters-vectors/tfidf-vectors \
  -c reuters-fkmeans-centroids \
  -o reuters-fkmeans-clusters \
  -cd 1.0 \ # convergence threshold
  -k 21 \
  -ow \
  -x 10 \ # maxiterations
  -m 2.0 \ # fuzzification factor
  -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view generated clusters.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \
  -d reuters-vectors/dictionary.file-* \
  -i reuters-fkmeans-clusters/clusters-2-final \
  -o clusters.txt \
  -b 10 \
  -n 10

Dirichlet clustering is based on a probabilistic model, that attempts to predict the distribution of points around a cluster. Model Distributions provided by Mahout are the DistanceMeasureClusterDistribution and GaussianClusterDistribution, and of course users can implement ModelDistribution to create their own custom one as well. The command line invocation of Dirichlet clusterer is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # invoke dirichlet clustering
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout dirichlet \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-dirichlet-clusters \
  -k 60 \                          # number of clusters
  -x 10 \                          # number of iterations
  -a0 1.0 \                        # alpha0 value
  -md org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution \
  -mp org.apache.mahout.math.SequentialAccessSparseVector #default vector type
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view generated clusters.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \
  -d reuters-vectors/dictionary.file-* \
  -i reuters-dirichlet-clusters/clusters-0 \
  -o clusters.txt \
  -b 10 \
  -n 10

The book covers another algorithm called LDA (Latent Dirichlet Allocation) which is useful for Topic Modeling, but the subcommand has been deprecated in version 0.7 and replaced with two other subcommands which I could not make working. I guess I will come back to this one later when I actually need it, and when I am a bit more familiar with Mahout.

Case Study - LastFM

The Last.fm dataset contains (music) group names tagged with words or phrases and the respective tag counts. The objective is to suggest tags for artists given their existing tags, ie to cluster tags using the artists as feature vector and tag counts as feature weights. Unlike the Reuters case, the approach here is to write just enough code to generate the dictionary (for clusterdump) and feature vectors (for the clusterer). For the actual clustering and viewing the clusters, we depend on the appropriate Mahout subcommands. Here is the code for the vectorizer.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Source: src/main/scala/com/mycompany/mia/cluster/LastFMVectorizer.scala
package com.mycompany.mia.cluster

import scala.collection.JavaConversions.iterableAsScalaIterable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{IntWritable, DefaultStringifier, Text, SequenceFile, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.mapreduce.{Mapper, Reducer, Job}
import org.apache.hadoop.util.{GenericsUtil, GenericOptionsParser}
import org.apache.mahout.math.{VectorWritable, Vector, SequentialAccessSparseVector, NamedVector}

/**
 * We need to read a file of the following format:
 * UUID<sep>Artist Name<sep>Tag<sep>Count
 * into a sequence file of [Text,VectorWritable] as follows:
 * Tag => VectorWritable(Artist:Count)
 */
object LastFMVectorizer {

  def main(args : Array[String]) : Int = {
    val conf = new Configuration()

    val otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs
    if (otherArgs.length != 2) {
      println("Usage: LastFMVectorizer input_file output_dir")
      -1
    }

    // Dictionary Mapper/Reducer. Extract unique artists
    val job1 = new Job(conf, "Dictionary Mapper")
    job1.setJarByClass(classOf[DictionaryMapper])
    job1.setMapperClass(classOf[DictionaryMapper])
    job1.setReducerClass(classOf[DictionaryReducer])
    job1.setOutputKeyClass(classOf[Text])
    job1.setOutputValueClass(classOf[IntWritable])
    job1.setInputFormatClass(classOf[TextInputFormat])
    job1.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,IntWritable]])
    FileInputFormat.addInputPath(job1, new Path(args(0)))
    val dictOutput = new Path(args(1), "dictionary")
    FileOutputFormat.setOutputPath(job1, dictOutput)
    var succ = (job1.waitForCompletion(true))
    
    if (succ) {
      // get a mapping of unique ids to artist and tag for converting
      // to Mahout vectors
      val dictOutput = new Path(args(1), "dictionary")
      val fs = FileSystem.get(dictOutput.toUri(), conf)
      val dictfiles = fs.globStatus(new Path(dictOutput, "part-*"))
      var i = 0
      val dictGlob = new Path(args(1), "dict-glob")
      val writer = new SequenceFile.Writer(fs, conf, dictGlob, 
        classOf[Text], classOf[IntWritable])
      for (dictfile <- dictfiles) {
        val path = dictfile.getPath()
        val reader = new SequenceFile.Reader(fs, path, conf)
        val key = new Text()
        val value = new IntWritable()
        while (reader.next(key, value)) {
          writer.append(key, new IntWritable(i))
          i += 1
        }
        reader.close()
      }
      writer.close()
      conf.set("dictpath", dictGlob.toString())

      val job2 = new Job(conf, "Dictionary Vectorizer")
      job2.setJarByClass(classOf[VectorMapper])
      job2.setMapperClass(classOf[VectorMapper])
      job2.setReducerClass(classOf[VectorReducer])
      job2.setOutputKeyClass(classOf[Text])
      job2.setOutputValueClass(classOf[VectorWritable])
      job2.setInputFormatClass(classOf[TextInputFormat])
      job2.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,VectorWritable]])
      FileInputFormat.addInputPath(job2, new Path(args(0)))
      FileOutputFormat.setOutputPath(job2, new Path(args(1), "vectors"))
      succ = (job2.waitForCompletion(true))
    }
    if (succ) 0 else 1
  }
}

/////////////////////////////////////////////////////////////////
// Assigns a unique ID to each artist. Needed by clusterdump
/////////////////////////////////////////////////////////////////

class DictionaryMapper 
    extends Mapper[LongWritable,Text,Text,IntWritable] {
  
  val pattern = """<sep>""".r
  val zero = new IntWritable(0)
  
  override def map(key : LongWritable, 
      value : Text, 
      context : Mapper[LongWritable,Text,Text,IntWritable]#Context) = { 
    val fields = pattern.split(value.toString)
    if (fields.length != 4) {
      context.getCounter("Map", "LinesWithErrors").increment(1)
    } else {
      context.write(new Text(fields(1)), zero)
    }
  }
}

class DictionaryReducer
    extends Reducer[Text,IntWritable,Text,IntWritable] {

  val zero = new IntWritable(0)

  override def reduce(key : Text, 
      values : java.lang.Iterable[IntWritable],
      context : Reducer[Text,IntWritable,Text,IntWritable]#Context) = {
    context.write(key, zero)
  }
}

/////////////////////////////////////////////////////////////////
// For each tag, creates feature vectors of artists
/////////////////////////////////////////////////////////////////

class VectorMapper
    extends Mapper[LongWritable,Text,Text,VectorWritable] {
  
  val pattern = """<sep>""".r
  var dict = new java.util.HashMap[String,Integer]()
  var vecwritable = new VectorWritable()
  
  override def setup(
      context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = {
    super.setup(context)
    val conf = context.getConfiguration()
    val dictpath = new Path(conf.get("dictpath"))
    val fs = FileSystem.get(dictpath.toUri(), conf)
    val reader = new SequenceFile.Reader(fs, dictpath, conf)
    val key = new Text()
    val value = new IntWritable()
    while (reader.next(key, value)) {
      dict.put(key.toString(), value.get())
    }
  }
  
  override def map(key : LongWritable, 
      value : Text,
      context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = {
    val fields = pattern.split(value.toString)
    if (fields.length != 4) {
      context.getCounter("Map", "LinesWithErrors").increment(1)
    } else {
      val artist = fields(1)
      val tag = fields(2)
      val weight = java.lang.Double.parseDouble(fields(3))
      val vector = new NamedVector(
        new SequentialAccessSparseVector(dict.size()), tag)
      vector.set(dict.get(artist), weight)
      vecwritable.set(vector)
      context.write(new Text(tag), vecwritable)
    }
  }
}

class VectorReducer
    extends Reducer[Text,VectorWritable,Text,VectorWritable] {
  
  var vecwritable = new VectorWritable()
  
  override def reduce(key : Text,
      values : java.lang.Iterable[VectorWritable],
      context : Reducer[Text,VectorWritable,Text,VectorWritable]#Context) = {
    var vector : Vector = null
    for (partialVector <- values) {
      if (vector == null) {
        vector = partialVector.get().like()
      } else {
        vector.plus(partialVector.get())
      }
    }
    val artistVector = new NamedVector(vector, key.toString())
    vecwritable.set(artistVector)
    context.write(key, vecwritable)
    for (value <- values) {
      context.write(key, value)
    }
  }
}

Once the vectors are generated, we just use Mahout subcommands to complete the clustering work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
hduser@cyclone:mahout$ hadoop jar /tmp/my-mahout-fatjar.jar \
  com.mycompany.mia.cluster.LastFMVectorizer \
  input/ArtistTags.dat \
  output
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i output/vectors \
  -o output/topics \
  -c output/centroids \
  -k 200 \
  -ow \
  -dm org.apache.mahout.common.distance.CosineDistanceMeasure \
  -cd 0.01 \
  -x 20 \
  -cl
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -i output/topics/clusters-20-final \
  -d output/dict-glob \
  -dt sequencefile \
  -n 10 \
  -o clusters.txt

There are two other case studies in the MIA book which I didn't do, maybe I will come back and do them once I finish the other stuff in the book. I also skipped over LDA (because its now deprecated in favor of two other alogorithms and I couldn't make either of them work). However, I now have a decent understanding of how to do clustering with Mahout.

If you've been reading this post with the intent of learning about Mahout clustering, you may have found the pace a little too fast. Thats because there is a lot of material to cover in a relatively short post. For a gentler introduction please refer to the MIA book and the companion code repository on GitHub..