## Sunday, September 09, 2012

### Learning Mahout : Classification

The final part covered in the MIA book is Classification. The popular algorithms available are Stochastic Gradient Descent (SGD), Naive Bayes and Complementary Naive Bayes, Random Forests and Online Passive Aggressive. There are other algorithms in the pipeline, as seen from the Classification section of the Mahout wiki page.

The MIA book has generic classification information and advice that will be useful for any algorithm, but it specifically covers SGD, Bayes and Naive Bayes (the last two via Mahout scripts). Of these SGD and Random Forest are good for classification problems involving continuous variables and small to medium datasets, and the Naive Bayes family is good for problems involving text like variables and medium to large datasets.

In general, a solution to a classification problem involves choosing the appropriate features for classification, choosing the algorithm, generating the feature vectors (vectorization), training the model and evaluating the results in a loop. You continue to tweak stuff in each of these steps until you get the results with the desired accuracy.

If training data is provided (or if it can be reduced to) tabular form, then we can use Mahout subcommands to generate a logistic regression (SGD) model and test it, as shown below:

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19``` ```hduser@cyclone:mahout\$ bin/mahout trainlogistic \ --input /tmp/donut.csv \ --output /tmp/model \ --target color \ # target variable --categories 2 \ # number of categories --predictors x y \ # predictor variables --types numeric \ # predictor variable types --features 20 \ # size of internal feature vector --passes 100 \ # number of passes over input --rate 50 # initial learning rate hduser@cyclone:mahout\$ bin/mahout runlogistic \ --input /tmp/test-donut.csv \ --model /tmp/model \ --auc \ # report area under curve --confusion # report confusion matrix AUC = 0.57 confusion: [[27.0, 13.0], [0.0, 0.0]] entropy: [[-0.4, -0.3], [-1.2, -0.7]] ... ```

The AUC (area under the curve) indicates the number of correct classifications, so 0 indicates a classifier thats always wrong and 1 indicates one thats always correct. The confusion matrix indicates the expected vs actual classification numbers (so for a good classifier, we should expect the numbers along the diagonal to predominate). For SGD, AUC and Log Likelihood are good measures. For NB and CNB, the percent correct and confusion matrix are good measures.

Most classification problems involve a mix of continuous, categorical, word like and text-like features. The input to a (Mahout) classification algorithm is in the form of vectors. Vectorizing approaches can be one cell/word, bag of words, and feature hashing (similar in concept to Bloom filters).

The data set used for most of the examples in the MIA book is the 20 Newsgroups DataSet. The data is a bunch of newsgroup postings, so apart from the text body, candidates for features could be their header metadata.

So the first step is to see what headers are available and how many there are. The following Unix command will list the headers and their counts. Based on the analysis, the decision is made to use the Subject, From, Keywords and Summary as additional metadata to assist in classification.

 ```1 2 3 4``` ```sujit@cyclone:20news-bydate-train\$ export LC_ALL='C'; \ for file in */*; do \ sed -E -e '/^\$/,\$d' -e 's/:.*//' -e '/^[[:space:]]/d' \$file; \ done | sort | uniq -c | sort -nr ```

The following code uses the AdaptiveLogisticRegression algorithm (which runs multiple SGD algorithms and automatically chooses the best one) to classify the 20 Newsgroups training set, then test the algorithm with the 20 Newsgroups test set. The code demonstrates the building of feature vectors for each document using multiple hashing encoders.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172``` ```// Source: src/main/scala/com/mycompany/mia/classify/SGD20NewsgroupsClassifier.scala package com.mycompany.mia.classify import java.io.{StringReader, PrintWriter, FileInputStream, File} import java.util.{HashMap, Collections, ArrayList} import scala.collection.JavaConversions.{collectionAsScalaIterable, asScalaBuffer} import scala.io.Source import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.lucene.util.Version import org.apache.mahout.classifier.sgd.OnlineLogisticRegression import org.apache.mahout.classifier.sgd.{ModelSerializer, L1, AdaptiveLogisticRegression} import org.apache.mahout.common.RandomUtils import org.apache.mahout.math.{Vector, RandomAccessSparseVector} import org.apache.mahout.vectorizer.encoders.{TextValueEncoder, Dictionary, ConstantValueEncoder} import com.google.common.collect.ConcurrentHashMultiset object SGD20NewsgroupsClassifier extends App { val features = 10000 val analyzer = new StandardAnalyzer(Version.LUCENE_32) val encoder = new TextValueEncoder("body") encoder.setProbes(2) val lines = new ConstantValueEncoder("line") val loglines = new ConstantValueEncoder("log(line)") val bias = new ConstantValueEncoder("intercept") val rand = RandomUtils.getRandom() // Usage: either // SGD20NewsgroupsClassifier train input_dir model_file dict_file, or // SGD20NewsgroupsClassifier test model_file dict_file test_dir args(0) match { case "train" => train(args(1), args(2), args(3)) case "test" => test(args(1), args(2), args(3)) } def train(trainDir : String, modelFile : String, dictFile : String) : Unit = { val newsgroups = new Dictionary() val learningAlgorithm = new AdaptiveLogisticRegression( 20, features, new L1()) learningAlgorithm.setInterval(800) learningAlgorithm.setAveragingWindow(500) // prepare data val files = new ArrayList[File]() val dirs = new File(trainDir).listFiles() for (dir <- dirs) { if (dir.isDirectory()) { newsgroups.intern(dir.getName()) for (file <- dir.listFiles()) { files.add(file) } } } Collections.shuffle(files) println(files.size() + " training files in " + dirs.length + " classes") var k = 0 var step = 0D var bumps = Array(1, 2, 5) for (file <- files) { val ng = file.getParentFile().getName() val actualClass = newsgroups.intern(ng) val vector = encodeFeatureVector(file) learningAlgorithm.train(actualClass, vector) } learningAlgorithm.close() // evaluate model val learner = learningAlgorithm.getBest().getPayload().getLearner() println("AUC=" + learner.auc() + ", %-correct=" + learner.percentCorrect()) ModelSerializer.writeBinary(modelFile, learner.getModels().get(0)) val serializedDict = new PrintWriter(dictFile) for (newsgroup <- newsgroups.values()) { serializedDict.println(newsgroup) } serializedDict.flush() serializedDict.close() } def encodeFeatureVector(file : File) : Vector = { val vector = new RandomAccessSparseVector(features) val words : ConcurrentHashMultiset[String] = ConcurrentHashMultiset.create() var numlines = 0 var startBody = false var prevLine = "" for (line <- Source.fromFile(file).getLines()) { if (line.startsWith("From:") || line.startsWith("Subject:") || line.startsWith("Keywords:") || line.startsWith("Summary:")) { countWords(line.replaceAll(".*:", ""), words) } if (! startBody && line.trim().length() == 0 && prevLine.trim().length() == 0) { startBody = true } if (startBody) { countWords(line, words) } numlines += 1 prevLine = line } bias.addToVector(null, 1, vector) lines.addToVector(null, numlines / 30, vector) loglines.addToVector(null, Math.log(numlines + 1), vector) for (word <- words) { encoder.addToVector(word, Math.log(1 + words.count(word)), vector) } vector } def countWords(line : String, words : ConcurrentHashMultiset[String]) : Unit = { val words = new ArrayList[String]() val tokenStream = analyzer.tokenStream("text", new StringReader(line)) tokenStream.addAttribute(classOf[CharTermAttribute]) while (tokenStream.incrementToken()) { val attr = tokenStream.getAttribute(classOf[CharTermAttribute]) words.add(new String(attr.buffer(), 0, attr.length())) } } def test(modelFile : String, dictFile : String, testDir : String) : Unit = { val model = ModelSerializer.readBinary( new FileInputStream(modelFile), classOf[OnlineLogisticRegression]) val newsgroups = getNewsgroups(dictFile) val dirs = new File(testDir).listFiles() var ncorrect = 0 var ntotal = 0 for (dir <- dirs) { if (dir.isDirectory()) { val expectedLabel = dir.getName() for (file <- dir.listFiles()) { val vector = encodeFeatureVector(file) val results = model.classify(vector) val actualLabel = newsgroups.get(results.maxValueIndex()) println("file: " + file.getName() + ", expected: " + expectedLabel + ", actual: " + actualLabel) if (actualLabel.equals(expectedLabel)) { ncorrect += 1 } ntotal += 1 } } } println("Correct: " + ncorrect + "/" + ntotal) } def getNewsgroups(dictFile : String) : HashMap[Integer,String] = { val newsgroups = new HashMap[Integer,String]() var lno = 0 for (line <- Source.fromFile(dictFile).getLines()) { newsgroups.put(lno, line) lno += 1 } newsgroups } } ```

To train and test it, we use the following commands. The accuracies are nothing to write home about (AUC=0.5), but at this time I am more concerned with the mechanics of getting a classifier working than the results.

 ```1 2 3 4 5 6 7 8``` ```sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.SGD20NewsgroupsClassifier train \ /path/to/20news-bydate-train \ /path/to/model.file /path/to/dict.file' sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.SGD20NewsgroupsClassifier test \ /path/to/model.file /path/to/dict.file \ /path/to/20news-bydate-test' ```

As you can see, the above is not run on Hadoop. This is because SGD is a sequential algorithm, so there is no point to parallelizing it. The Naive Bayes family, on the other hand, does benefit from parallelism, so it is run on Hadoop. The book covers running via scripts, so thats what I did, figuring I will go back and check out the code later (the class names are available in "driver.classes.props" in the Mahout source) if and when I need to build integrated solutions.

The book specifies the prepare20newsgroups subcommand, but that is deprecated in the current Mahout distribution, and they have a shell script to run both the Naive Bayes and SGD versions. I believe the part where the training files are converted to a 1 file per line sequence file is incorrect, so I wrote my own code to do this:

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50``` ```// Source: src/main/scala/com/mycompany/mia/classify/NaiveBayes20NewsgroupDataPreparer.scala package com.mycompany.mia.classify import java.io.File import scala.io.Source import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.io.{Text, SequenceFile} object NaiveBayes20NewsgroupDataPreparer extends App { val conf = new Configuration() val fs = FileSystem.get(conf) val path = new Path(args(1)) val writer = new SequenceFile.Writer(fs, conf, path, classOf[Text], classOf[Text]) val dirs = new File(args(0)).listFiles() var n = 0 for (dir <- dirs) { val label = dir.getName() for (file <- dir.listFiles()) { val text = Source.fromFile(file). getLines(). foldLeft("") (_ + " " + _) // extra slash added to key to get around AAOOB thrown // by BayesUtils.writeLabelIndex writer.append(new Text("/" + label), new Text(text)) n += 1 } println(label + ": " + n + " files loaded") } writer.close() // self-test to see that everything loaded okay... val reader = new SequenceFile.Reader(fs, path, conf) val key = new Text() val value = new Text() var rec = 0 while (reader.next(key, value)) { if (rec < 10) { println(key.toString() + " => " + value.toString()) } rec += 1 } println("...") println("#=records written: " + rec) reader.close() } ```

The following sequence of scripts prepares the data from the input files, trains a Naive Bayes classifier and runs it against the test set. Running the complementary Naive Bayes just involves passing an extra parameter to the trainnb and testnb subcommands.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18``` ```sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.NaiveBayes20NewsgroupDataPreparer \ /path/to/20news-bydate-train /tmp/20news-seq' hduser@cyclone:mahout\$ hadoop fs -put /tmp/20news-seq . hduser@cyclone:mahout\$ bin/mahout seq2sparse \ -i 20news-seq -o 20news-vectors -lnorm -nv -wt tfidf hduser@cyclone:mahout\$ bin/mahout split \ -i 20news-vectors/tfidf-vectors \ --trainingOutput 20news-train-vectors \ --testOutput 20news-test-vectors \ --randomSelectionPct 20 \ --overwrite --sequenceFiles -xm sequential hduser@cyclone:mahout\$ bin/mahout trainnb \ -i 20news-train-vectors -el -o model -li labelindex -ow hduser@cyclone:mahout\$ bin/mahout testnb \ -i 20news-train-vectors \ -o 20news-testing \ -m model -l labelindex -ow ```

To improve the output of the classifier, one should investigate target leaks (resulting to "too good" results), broken feature extraction (results in bad results), eliminating, adding and combining features (eg, brand, gender, brand+gender, etc), normalizing feature dimensions, trying out different algorithms, etc.

Mahout is optimized for situations where you get lots of training examples (internet scale, typically generated by user clicks, etc), rather than the much smaller training sets created by domain experts in more traditional settings. More than the other parts, the classification code base seems to be more fluid and under heavier development, so it may make sense to look in the Mahout source code or the Mahout Newsgroup for answers rather than depending on the book.

## Saturday, September 01, 2012

### Learning Mahout : Clustering

The next section in the MIA book is Clustering. As with Recommenders, Mahout provides both in-memory and map-reduce versions of various clustering algorithms. However, unlike Recommenders, there are quite a few toolkits (like Weka or Mallet for example) which are more comprehensive than Mahout for small or medium sized datasets, so I decided to concentrate on the M/R implementations.

The full list of clustering algorithms available in Mahout at the moment can be found on its Wiki Page under the Clustering section. The ones covered in the book are K-Means, Canopy, Fuzzy K-Means, LDA and Dirichlet. All these algorithms expect data in the form of vectors, so the first step is to convert the input data into this format, a process known as vectorization. Essentially, clustering is the process of finding nearby points in n-dimensional space, where each vector represents a point in this space, and each element of a vector represents a dimension in this space.

It is important to choose the right vector format for the clustering algorithm. For example, one should use the SequentialAccessSparseVector for KMeans, sinc there is lot of sequential access in the algorithm. Other possibilities are the DenseVector and the RandomAccessSparseVector formats. The input to a clustering algorithm is a SequenceFile containing key-value pairs of {IntWritable, VectorWritable} objects. Since the implementations are given, Mahout users would spend most of their time vectorizing the input (and thinking about what feature vectors to use, of course).

Once vectorized, one can invoke the appropriate algorithm either by calling the appropriate bin/mahout subcommand from the command line, or through a program by calling the appropriate Driver's run method. All the algorithms require the initial centroids to be provided, and the algorithm iteratively perturbes the centroids until they converge. One can either guess randomly or use the Canopy clusterer to generate the initial centroids.

Finally, the output of the clustering algorithm can be read using the Mahout cluster dumper subcommand. To check the quality, take a look at the top terms in each cluster to see how "believable" they are. Another way to measure the quality of clusters is to measure the intercluster and intracluster distances. A lower spread of intercluster and intracluster distances generally imply "good" clusters. Here is code to calculate inter-cluster distance based on code from the MIA book.

There are various ways of improving clustering quality. The first, of course, is knowing your data well enough so you can choose good features to cluster on. For clustering text or real world (dirty) data, vector generation can be improved by removing noise and using a good weighting technique. Mahout allows you to specify your custom Lucene analyzers to its clustering subcommands for this. Cluster quality is also dependent on the measure used to calculate similarity between two feature vectors. Once again, Mahout supplies a large number of Distance Measure implementations (Chebyshev, Cosine, Mahalanobis, Manhattan, Minkowski, SquaredEuclidean, Euclidean, Tanimoto, Weighted Euclidean and Weighted Manhattan) and also allows you to specify your own if these don't suit your purposes. Within each dimension, points can be normalized to remove the effect of outliers - the normalization p-norm should match the p-norm used by the distance measure. Finally, if the dimensions are not comparable, such as number of bedrooms and price in dollars for a house, then one should normalize across dimensions, a process known as weighting (this should be done during the vectorization process, which you control fully).

### Case Study - Reuters

The dataset used for this is the Reuters-21578 collection. I basically followed along with the book, trying out commands and making them work against a pseudo-distributed Hadoop installation on my notebook.

Vectors can be created using the following Mahout subcommands.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39``` ```hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # create sequence file from directory of input files hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout seqdirectory \ -c UTF-8 \ -i reuters \ # input directory -o reuters-seqfiles # output directory hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # create vectors from sequence file (no normalization) hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout seq2sparse \ -i reuters-seqfiles \ # input: sequence files directory -o reuters-vectors \ # output: vectors -ow hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # this creates 7 sub directories under output hduser@cyclone:mahout\$ # df-count, dictionary.file*, frequency.file-*, hduser@cyclone:mahout\$ # tf-vectors, and tfidf-vectors hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # create vectors from sequence file (with normalization) hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout seq2sparse \ -i reuters-seqfiles \ -o reuters-normalized-bigram -ow \ -a org.apache.lucene.analysis.WhitespaceAnalyzer \ # analyzer -chunk 200 \ # chunk size (MB) -wt tfidf \ # weighting scheme -s 5 \ # minimum support -md 3 \ # minimum document frequency -x 90 \ # maximum document frequency percentage -ng 2 \ # ngram size -ml 50 \ # minimum log likelihood ratio -seq \ # create sequential access sparse vectors -n 2 # normalization - use 2-norm (aka Euclidean norm) # should be paired with similar distance measure hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # two additional directories created, hduser@cyclone:mahout\$ # tokenized-documents and wordcount. hduser@cyclone:mahout\$ ####################################################### ```

To run the K-Means cluster, here is the sequence of mahout subcommands.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27``` ```hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # invoke kmeans hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout kmeans \ -i reuters-vectors/tfidf-vectors \ # input -c reuters-initial-clusters \ # generate initial clusters -o reuters-kmeans-clusters # output -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure \ -cd 1.0 \ # convergence threshold -k 20 \ # number of clusters -x 20 \ # max iterations -cl # run clustering after iterations hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # in the output directory, there is a clusteredPoints hduser@cyclone:mahout\$ # directory, and directories corresponding to each hduser@cyclone:mahout\$ # clustering iteration named clusters-*. The last hduser@cyclone:mahout\$ # cluster is named cluster-*-final. hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # dump clusters to local filesystem hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout clusterdump \ -dt sequencefile \ # format: {Integer => String} -d reuters-vectors/dictionary.file-* \ # dictionary: {id => word} -i reuters-kmeans-clusters/clusters-3-final \ # input -o clusters.txt \ # output (local filesystem) -b 10 \ # format length -n 10 # number of top terms to print ```

The dm flag specifies the Distance measure. In general, it is better to use either Cosine or Tanimoto distance measures for clustering text rather than the SquaredEuclidean measure.

The value of k (number of clusters) is provided by the caller based on his knowledge of the data. You can eliminate the guesswork by using the Canopy clusterer with appropriate distance thresholds to indicate the size of clusters as shown below.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33``` ```hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # generate initial clusters with canopy hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout canopy \ -i reuters-vectors/tfidf-vectors \ -o reuters-canopy-centroids \ -dm org.apache.mahout.common.distance.EuclideanDistanceMeasure \ -t1 1500 \ # points between 0 and t1 from centroid are included -t2 2000 # points between t1 and t2 from centroid are removed hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # run kmeans with canopy cluster centroids. hduser@cyclone:mahout\$ # in this case, since -c points to a populated path, hduser@cyclone:mahout\$ # the algo doesn't have to guess initial centroids. hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout kmeans \ -i reuters-vectors/tfidf-vectors \ -o reuters-kmeans-clusters \ -c reuters-canopy-centroids/clusters-0-final \ -dm org.apache.mahout.common.distance.TanimotoDistanceMeasure \ -cd 0.1 \ # convergence threshold -ow \ # overwrite -x 20 \ # max iterations -cl # run clustering after iterations hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # view the clusters generated hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout clusterdump \ -dt sequencefile \ # format {Integer => String} -d reuters-vectors/dictionary.file-* \ # dictionary (id => word) -i reuters-kmeans-clusters/clusters-1-final \ # input directory -o clusters.txt \ # output (local filesystem) -b 10 \ # max size of word to print -n 10 # number of top terms ```

You can also write your own custom clusterer. I suppose this would be useful when you want to deploy an end-to-end solution but as you can see, there is not much difference from using a pipeline of commands as shown above. Here is an example of a custom clusterer that works against the Reuters dataset, using Canopy to compute the initial centroids, then using KMeans (or optionally Fuzzy KMeans) for clustering.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140``` ```// Source: :r src/main/scala/com/mycompany/mia/cluster/ReutersClusterer.scala package com.mycompany.mia.cluster import java.io.{StringReader, Reader} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.io.{SequenceFile, IntWritable} import org.apache.lucene.analysis.standard.{StandardTokenizer, StandardFilter, StandardAnalyzer} import org.apache.lucene.analysis.tokenattributes.TermAttribute import org.apache.lucene.analysis.{WhitespaceTokenizer, TokenStream, StopFilter, LowerCaseFilter, Analyzer} import org.apache.lucene.util.Version import org.apache.mahout.clustering.canopy.CanopyDriver import org.apache.mahout.clustering.classify.WeightedVectorWritable import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver import org.apache.mahout.clustering.kmeans.KMeansDriver import org.apache.mahout.clustering.Cluster import org.apache.mahout.common.distance.{TanimotoDistanceMeasure, EuclideanDistanceMeasure} import org.apache.mahout.common.HadoopUtil import org.apache.mahout.vectorizer.tfidf.TFIDFConverter import org.apache.mahout.vectorizer.{DocumentProcessor, DictionaryVectorizer} object ReutersClusterer extends App { // parameters val minSupport = 2 val minDf = 5 val maxDfPercent = 95 val maxNGramSize = 2 val minLLRValue = 50 val reduceTasks = 1 val chunkSize = 200 val norm = 2 val sequentialAccessOutput = true val inputDir = args(0) // directory of doc sequence file(s) val outputDir = args(1) // directory where clusters will be written val algo = args(2) // "kmeans" or "fkmeans" val conf = new Configuration() val fs = FileSystem.get(conf) HadoopUtil.delete(conf, new Path(outputDir)) // converts input docs in sequence file format in input_dir // into token array in output_dir/tokenized-documents val inputPath = new Path(inputDir) val tokenizedDocPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER) DocumentProcessor.tokenizeDocuments(inputPath, classOf[ReutersAnalyzer], tokenizedDocPath, conf) // reads token array in output_dir/tokenized-documents and // writes term frequency vectors in output_dir (under tf-vectors) DictionaryVectorizer.createTermFrequencyVectors( tokenizedDocPath, new Path(outputDir), DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER, conf, minSupport, maxNGramSize, minLLRValue, 2, true, reduceTasks, chunkSize, sequentialAccessOutput, false) // converts term frequency vectors in output_dir/tf-vectors // to TF-IDF vectors in output_dir (under tfidf-vectors) val tfVectorPath = new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER) val outputPath = new Path(outputDir) val docFreqs = TFIDFConverter.calculateDF( tfVectorPath, outputPath, conf, chunkSize) TFIDFConverter.processTfIdf(tfVectorPath, outputPath, conf, docFreqs, minDf, maxDfPercent, norm, true, sequentialAccessOutput, false, reduceTasks) // reads tfidf-vectors from output_dir/tfidf-vectors // and writes out Canopy centroids at output_dir/canopy-centroids val tfidfVectorPath = new Path(outputDir, "tfidf-vectors") val canopyCentroidPath = new Path(outputDir, "canopy-centroids") CanopyDriver.run(conf, tfidfVectorPath, canopyCentroidPath, new EuclideanDistanceMeasure(), 250, 120, false, 0.01, false) // reads tfidf-vectors from output_dir/tfidf-vectors and // refers to directory path for initial clusters, and // writes out clusters to output_dir/clusters val clusterPath = new Path(outputDir, "clusters") algo match { case "kmeans" => KMeansDriver.run(conf, tfidfVectorPath, new Path(canopyCentroidPath, "clusters-0-final"), clusterPath, new TanimotoDistanceMeasure(), 0.01, 20, true, 0.01, false) case "fkmeans" => FuzzyKMeansDriver.run(conf, tfidfVectorPath, new Path(canopyCentroidPath, "clusters-0-final"), clusterPath, new TanimotoDistanceMeasure(), 0.01, 20, 2.0f, true, true, 0.0, false) case _ => throw new IllegalArgumentException( "algo can be either kmeans or fkmeans") } // read clusters and output val reader = new SequenceFile.Reader(fs, new Path(clusterPath, Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), conf) val key = new IntWritable() val value = new WeightedVectorWritable() while (reader.next(key, value)) { println(key.toString + " belongs to " + value.toString) } reader.close() } class ReutersAnalyzer extends Analyzer { val ALPHA_PATTERN = """[a-z]+""".r override def tokenStream(fieldName : String, reader : Reader) : TokenStream = { // tokenize input string by standard tokenizer var result : TokenStream = new StandardTokenizer(Version.LUCENE_CURRENT, reader) result = new StandardFilter(result) // lowercase all words result = new LowerCaseFilter(result) // remove stop words result = new StopFilter(true, result, StandardAnalyzer.STOP_WORDS_SET) val termAttr = result.addAttribute(classOf[TermAttribute]). asInstanceOf[TermAttribute] val buf = new StringBuilder() while (result.incrementToken()) { // remove words < 3 chars long if (termAttr.termLength() >= 3) { val word = new String( termAttr.termBuffer(), 0, termAttr.termLength()) // remove words with non-alpha chars in them if (ALPHA_PATTERN.pattern.matcher(word).matches) { buf.append(word).append(" ") } } } // return the remaining tokens new WhitespaceTokenizer(new StringReader(buf.toString)) } } ```

Just like K-Means, the other clustering algorithms can also be run either form a script or via code as shown above. Here are the commands for the other major clustering algorithms.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23``` ```hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # run fuzzy kmeans with random initial centroids hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout fkmeans \ -i reuters-vectors/tfidf-vectors \ -c reuters-fkmeans-centroids \ -o reuters-fkmeans-clusters \ -cd 1.0 \ # convergence threshold -k 21 \ -ow \ -x 10 \ # maxiterations -m 2.0 \ # fuzzification factor -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # view generated clusters. hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout clusterdump \ -dt sequencefile \ -d reuters-vectors/dictionary.file-* \ -i reuters-fkmeans-clusters/clusters-2-final \ -o clusters.txt \ -b 10 \ -n 10 ```

Dirichlet clustering is based on a probabilistic model, that attempts to predict the distribution of points around a cluster. Model Distributions provided by Mahout are the DistanceMeasureClusterDistribution and GaussianClusterDistribution, and of course users can implement ModelDistribution to create their own custom one as well. The command line invocation of Dirichlet clusterer is shown below:

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21``` ```hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # invoke dirichlet clustering hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout dirichlet \ -i reuters-vectors/tfidf-vectors \ -o reuters-dirichlet-clusters \ -k 60 \ # number of clusters -x 10 \ # number of iterations -a0 1.0 \ # alpha0 value -md org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution \ -mp org.apache.mahout.math.SequentialAccessSparseVector #default vector type hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ # view generated clusters. hduser@cyclone:mahout\$ ####################################################### hduser@cyclone:mahout\$ bin/mahout clusterdump \ -dt sequencefile \ -d reuters-vectors/dictionary.file-* \ -i reuters-dirichlet-clusters/clusters-0 \ -o clusters.txt \ -b 10 \ -n 10 ```

The book covers another algorithm called LDA (Latent Dirichlet Allocation) which is useful for Topic Modeling, but the subcommand has been deprecated in version 0.7 and replaced with two other subcommands which I could not make working. I guess I will come back to this one later when I actually need it, and when I am a bit more familiar with Mahout.

### Case Study - LastFM

The Last.fm dataset contains (music) group names tagged with words or phrases and the respective tag counts. The objective is to suggest tags for artists given their existing tags, ie to cluster tags using the artists as feature vector and tag counts as feature weights. Unlike the Reuters case, the approach here is to write just enough code to generate the dictionary (for clusterdump) and feature vectors (for the clusterer). For the actual clustering and viewing the clusters, we depend on the appropriate Mahout subcommands. Here is the code for the vectorizer.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187``` ```// Source: src/main/scala/com/mycompany/mia/cluster/LastFMVectorizer.scala package com.mycompany.mia.cluster import scala.collection.JavaConversions.iterableAsScalaIterable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.io.{IntWritable, DefaultStringifier, Text, SequenceFile, LongWritable} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} import org.apache.hadoop.mapreduce.{Mapper, Reducer, Job} import org.apache.hadoop.util.{GenericsUtil, GenericOptionsParser} import org.apache.mahout.math.{VectorWritable, Vector, SequentialAccessSparseVector, NamedVector} /** * We need to read a file of the following format: * UUIDArtist NameTagCount * into a sequence file of [Text,VectorWritable] as follows: * Tag => VectorWritable(Artist:Count) */ object LastFMVectorizer { def main(args : Array[String]) : Int = { val conf = new Configuration() val otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs if (otherArgs.length != 2) { println("Usage: LastFMVectorizer input_file output_dir") -1 } // Dictionary Mapper/Reducer. Extract unique artists val job1 = new Job(conf, "Dictionary Mapper") job1.setJarByClass(classOf[DictionaryMapper]) job1.setMapperClass(classOf[DictionaryMapper]) job1.setReducerClass(classOf[DictionaryReducer]) job1.setOutputKeyClass(classOf[Text]) job1.setOutputValueClass(classOf[IntWritable]) job1.setInputFormatClass(classOf[TextInputFormat]) job1.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,IntWritable]]) FileInputFormat.addInputPath(job1, new Path(args(0))) val dictOutput = new Path(args(1), "dictionary") FileOutputFormat.setOutputPath(job1, dictOutput) var succ = (job1.waitForCompletion(true)) if (succ) { // get a mapping of unique ids to artist and tag for converting // to Mahout vectors val dictOutput = new Path(args(1), "dictionary") val fs = FileSystem.get(dictOutput.toUri(), conf) val dictfiles = fs.globStatus(new Path(dictOutput, "part-*")) var i = 0 val dictGlob = new Path(args(1), "dict-glob") val writer = new SequenceFile.Writer(fs, conf, dictGlob, classOf[Text], classOf[IntWritable]) for (dictfile <- dictfiles) { val path = dictfile.getPath() val reader = new SequenceFile.Reader(fs, path, conf) val key = new Text() val value = new IntWritable() while (reader.next(key, value)) { writer.append(key, new IntWritable(i)) i += 1 } reader.close() } writer.close() conf.set("dictpath", dictGlob.toString()) val job2 = new Job(conf, "Dictionary Vectorizer") job2.setJarByClass(classOf[VectorMapper]) job2.setMapperClass(classOf[VectorMapper]) job2.setReducerClass(classOf[VectorReducer]) job2.setOutputKeyClass(classOf[Text]) job2.setOutputValueClass(classOf[VectorWritable]) job2.setInputFormatClass(classOf[TextInputFormat]) job2.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,VectorWritable]]) FileInputFormat.addInputPath(job2, new Path(args(0))) FileOutputFormat.setOutputPath(job2, new Path(args(1), "vectors")) succ = (job2.waitForCompletion(true)) } if (succ) 0 else 1 } } ///////////////////////////////////////////////////////////////// // Assigns a unique ID to each artist. Needed by clusterdump ///////////////////////////////////////////////////////////////// class DictionaryMapper extends Mapper[LongWritable,Text,Text,IntWritable] { val pattern = """""".r val zero = new IntWritable(0) override def map(key : LongWritable, value : Text, context : Mapper[LongWritable,Text,Text,IntWritable]#Context) = { val fields = pattern.split(value.toString) if (fields.length != 4) { context.getCounter("Map", "LinesWithErrors").increment(1) } else { context.write(new Text(fields(1)), zero) } } } class DictionaryReducer extends Reducer[Text,IntWritable,Text,IntWritable] { val zero = new IntWritable(0) override def reduce(key : Text, values : java.lang.Iterable[IntWritable], context : Reducer[Text,IntWritable,Text,IntWritable]#Context) = { context.write(key, zero) } } ///////////////////////////////////////////////////////////////// // For each tag, creates feature vectors of artists ///////////////////////////////////////////////////////////////// class VectorMapper extends Mapper[LongWritable,Text,Text,VectorWritable] { val pattern = """""".r var dict = new java.util.HashMap[String,Integer]() var vecwritable = new VectorWritable() override def setup( context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = { super.setup(context) val conf = context.getConfiguration() val dictpath = new Path(conf.get("dictpath")) val fs = FileSystem.get(dictpath.toUri(), conf) val reader = new SequenceFile.Reader(fs, dictpath, conf) val key = new Text() val value = new IntWritable() while (reader.next(key, value)) { dict.put(key.toString(), value.get()) } } override def map(key : LongWritable, value : Text, context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = { val fields = pattern.split(value.toString) if (fields.length != 4) { context.getCounter("Map", "LinesWithErrors").increment(1) } else { val artist = fields(1) val tag = fields(2) val weight = java.lang.Double.parseDouble(fields(3)) val vector = new NamedVector( new SequentialAccessSparseVector(dict.size()), tag) vector.set(dict.get(artist), weight) vecwritable.set(vector) context.write(new Text(tag), vecwritable) } } } class VectorReducer extends Reducer[Text,VectorWritable,Text,VectorWritable] { var vecwritable = new VectorWritable() override def reduce(key : Text, values : java.lang.Iterable[VectorWritable], context : Reducer[Text,VectorWritable,Text,VectorWritable]#Context) = { var vector : Vector = null for (partialVector <- values) { if (vector == null) { vector = partialVector.get().like() } else { vector.plus(partialVector.get()) } } val artistVector = new NamedVector(vector, key.toString()) vecwritable.set(artistVector) context.write(key, vecwritable) for (value <- values) { context.write(key, value) } } } ```

Once the vectors are generated, we just use Mahout subcommands to complete the clustering work.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20``` ```hduser@cyclone:mahout\$ hadoop jar /tmp/my-mahout-fatjar.jar \ com.mycompany.mia.cluster.LastFMVectorizer \ input/ArtistTags.dat \ output hduser@cyclone:mahout\$ bin/mahout kmeans \ -i output/vectors \ -o output/topics \ -c output/centroids \ -k 200 \ -ow \ -dm org.apache.mahout.common.distance.CosineDistanceMeasure \ -cd 0.01 \ -x 20 \ -cl hduser@cyclone:mahout\$ bin/mahout clusterdump \ -i output/topics/clusters-20-final \ -d output/dict-glob \ -dt sequencefile \ -n 10 \ -o clusters.txt ```

There are two other case studies in the MIA book which I didn't do, maybe I will come back and do them once I finish the other stuff in the book. I also skipped over LDA (because its now deprecated in favor of two other alogorithms and I couldn't make either of them work). However, I now have a decent understanding of how to do clustering with Mahout.

If you've been reading this post with the intent of learning about Mahout clustering, you may have found the pace a little too fast. Thats because there is a lot of material to cover in a relatively short post. For a gentler introduction please refer to the MIA book and the companion code repository on GitHub..