## Sunday, September 09, 2012

### Learning Mahout : Classification

The final part covered in the MIA book is Classification. The popular algorithms available are Stochastic Gradient Descent (SGD), Naive Bayes and Complementary Naive Bayes, Random Forests and Online Passive Aggressive. There are other algorithms in the pipeline, as seen from the Classification section of the Mahout wiki page.

The MIA book has generic classification information and advice that will be useful for any algorithm, but it specifically covers SGD, Bayes and Naive Bayes (the last two via Mahout scripts). Of these SGD and Random Forest are good for classification problems involving continuous variables and small to medium datasets, and the Naive Bayes family is good for problems involving text like variables and medium to large datasets.

In general, a solution to a classification problem involves choosing the appropriate features for classification, choosing the algorithm, generating the feature vectors (vectorization), training the model and evaluating the results in a loop. You continue to tweak stuff in each of these steps until you get the results with the desired accuracy.

If training data is provided (or if it can be reduced to) tabular form, then we can use Mahout subcommands to generate a logistic regression (SGD) model and test it, as shown below:

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19``` ```hduser@cyclone:mahout\$ bin/mahout trainlogistic \ --input /tmp/donut.csv \ --output /tmp/model \ --target color \ # target variable --categories 2 \ # number of categories --predictors x y \ # predictor variables --types numeric \ # predictor variable types --features 20 \ # size of internal feature vector --passes 100 \ # number of passes over input --rate 50 # initial learning rate hduser@cyclone:mahout\$ bin/mahout runlogistic \ --input /tmp/test-donut.csv \ --model /tmp/model \ --auc \ # report area under curve --confusion # report confusion matrix AUC = 0.57 confusion: [[27.0, 13.0], [0.0, 0.0]] entropy: [[-0.4, -0.3], [-1.2, -0.7]] ... ```

The AUC (area under the curve) indicates the number of correct classifications, so 0 indicates a classifier thats always wrong and 1 indicates one thats always correct. The confusion matrix indicates the expected vs actual classification numbers (so for a good classifier, we should expect the numbers along the diagonal to predominate). For SGD, AUC and Log Likelihood are good measures. For NB and CNB, the percent correct and confusion matrix are good measures.

Most classification problems involve a mix of continuous, categorical, word like and text-like features. The input to a (Mahout) classification algorithm is in the form of vectors. Vectorizing approaches can be one cell/word, bag of words, and feature hashing (similar in concept to Bloom filters).

The data set used for most of the examples in the MIA book is the 20 Newsgroups DataSet. The data is a bunch of newsgroup postings, so apart from the text body, candidates for features could be their header metadata.

So the first step is to see what headers are available and how many there are. The following Unix command will list the headers and their counts. Based on the analysis, the decision is made to use the Subject, From, Keywords and Summary as additional metadata to assist in classification.

 ```1 2 3 4``` ```sujit@cyclone:20news-bydate-train\$ export LC_ALL='C'; \ for file in */*; do \ sed -E -e '/^\$/,\$d' -e 's/:.*//' -e '/^[[:space:]]/d' \$file; \ done | sort | uniq -c | sort -nr ```

The following code uses the AdaptiveLogisticRegression algorithm (which runs multiple SGD algorithms and automatically chooses the best one) to classify the 20 Newsgroups training set, then test the algorithm with the 20 Newsgroups test set. The code demonstrates the building of feature vectors for each document using multiple hashing encoders.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172``` ```// Source: src/main/scala/com/mycompany/mia/classify/SGD20NewsgroupsClassifier.scala package com.mycompany.mia.classify import java.io.{StringReader, PrintWriter, FileInputStream, File} import java.util.{HashMap, Collections, ArrayList} import scala.collection.JavaConversions.{collectionAsScalaIterable, asScalaBuffer} import scala.io.Source import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.lucene.util.Version import org.apache.mahout.classifier.sgd.OnlineLogisticRegression import org.apache.mahout.classifier.sgd.{ModelSerializer, L1, AdaptiveLogisticRegression} import org.apache.mahout.common.RandomUtils import org.apache.mahout.math.{Vector, RandomAccessSparseVector} import org.apache.mahout.vectorizer.encoders.{TextValueEncoder, Dictionary, ConstantValueEncoder} import com.google.common.collect.ConcurrentHashMultiset object SGD20NewsgroupsClassifier extends App { val features = 10000 val analyzer = new StandardAnalyzer(Version.LUCENE_32) val encoder = new TextValueEncoder("body") encoder.setProbes(2) val lines = new ConstantValueEncoder("line") val loglines = new ConstantValueEncoder("log(line)") val bias = new ConstantValueEncoder("intercept") val rand = RandomUtils.getRandom() // Usage: either // SGD20NewsgroupsClassifier train input_dir model_file dict_file, or // SGD20NewsgroupsClassifier test model_file dict_file test_dir args(0) match { case "train" => train(args(1), args(2), args(3)) case "test" => test(args(1), args(2), args(3)) } def train(trainDir : String, modelFile : String, dictFile : String) : Unit = { val newsgroups = new Dictionary() val learningAlgorithm = new AdaptiveLogisticRegression( 20, features, new L1()) learningAlgorithm.setInterval(800) learningAlgorithm.setAveragingWindow(500) // prepare data val files = new ArrayList[File]() val dirs = new File(trainDir).listFiles() for (dir <- dirs) { if (dir.isDirectory()) { newsgroups.intern(dir.getName()) for (file <- dir.listFiles()) { files.add(file) } } } Collections.shuffle(files) println(files.size() + " training files in " + dirs.length + " classes") var k = 0 var step = 0D var bumps = Array(1, 2, 5) for (file <- files) { val ng = file.getParentFile().getName() val actualClass = newsgroups.intern(ng) val vector = encodeFeatureVector(file) learningAlgorithm.train(actualClass, vector) } learningAlgorithm.close() // evaluate model val learner = learningAlgorithm.getBest().getPayload().getLearner() println("AUC=" + learner.auc() + ", %-correct=" + learner.percentCorrect()) ModelSerializer.writeBinary(modelFile, learner.getModels().get(0)) val serializedDict = new PrintWriter(dictFile) for (newsgroup <- newsgroups.values()) { serializedDict.println(newsgroup) } serializedDict.flush() serializedDict.close() } def encodeFeatureVector(file : File) : Vector = { val vector = new RandomAccessSparseVector(features) val words : ConcurrentHashMultiset[String] = ConcurrentHashMultiset.create() var numlines = 0 var startBody = false var prevLine = "" for (line <- Source.fromFile(file).getLines()) { if (line.startsWith("From:") || line.startsWith("Subject:") || line.startsWith("Keywords:") || line.startsWith("Summary:")) { countWords(line.replaceAll(".*:", ""), words) } if (! startBody && line.trim().length() == 0 && prevLine.trim().length() == 0) { startBody = true } if (startBody) { countWords(line, words) } numlines += 1 prevLine = line } bias.addToVector(null, 1, vector) lines.addToVector(null, numlines / 30, vector) loglines.addToVector(null, Math.log(numlines + 1), vector) for (word <- words) { encoder.addToVector(word, Math.log(1 + words.count(word)), vector) } vector } def countWords(line : String, words : ConcurrentHashMultiset[String]) : Unit = { val words = new ArrayList[String]() val tokenStream = analyzer.tokenStream("text", new StringReader(line)) tokenStream.addAttribute(classOf[CharTermAttribute]) while (tokenStream.incrementToken()) { val attr = tokenStream.getAttribute(classOf[CharTermAttribute]) words.add(new String(attr.buffer(), 0, attr.length())) } } def test(modelFile : String, dictFile : String, testDir : String) : Unit = { val model = ModelSerializer.readBinary( new FileInputStream(modelFile), classOf[OnlineLogisticRegression]) val newsgroups = getNewsgroups(dictFile) val dirs = new File(testDir).listFiles() var ncorrect = 0 var ntotal = 0 for (dir <- dirs) { if (dir.isDirectory()) { val expectedLabel = dir.getName() for (file <- dir.listFiles()) { val vector = encodeFeatureVector(file) val results = model.classify(vector) val actualLabel = newsgroups.get(results.maxValueIndex()) println("file: " + file.getName() + ", expected: " + expectedLabel + ", actual: " + actualLabel) if (actualLabel.equals(expectedLabel)) { ncorrect += 1 } ntotal += 1 } } } println("Correct: " + ncorrect + "/" + ntotal) } def getNewsgroups(dictFile : String) : HashMap[Integer,String] = { val newsgroups = new HashMap[Integer,String]() var lno = 0 for (line <- Source.fromFile(dictFile).getLines()) { newsgroups.put(lno, line) lno += 1 } newsgroups } } ```

To train and test it, we use the following commands. The accuracies are nothing to write home about (AUC=0.5), but at this time I am more concerned with the mechanics of getting a classifier working than the results.

 ```1 2 3 4 5 6 7 8``` ```sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.SGD20NewsgroupsClassifier train \ /path/to/20news-bydate-train \ /path/to/model.file /path/to/dict.file' sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.SGD20NewsgroupsClassifier test \ /path/to/model.file /path/to/dict.file \ /path/to/20news-bydate-test' ```

As you can see, the above is not run on Hadoop. This is because SGD is a sequential algorithm, so there is no point to parallelizing it. The Naive Bayes family, on the other hand, does benefit from parallelism, so it is run on Hadoop. The book covers running via scripts, so thats what I did, figuring I will go back and check out the code later (the class names are available in "driver.classes.props" in the Mahout source) if and when I need to build integrated solutions.

The book specifies the prepare20newsgroups subcommand, but that is deprecated in the current Mahout distribution, and they have a shell script to run both the Naive Bayes and SGD versions. I believe the part where the training files are converted to a 1 file per line sequence file is incorrect, so I wrote my own code to do this:

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50``` ```// Source: src/main/scala/com/mycompany/mia/classify/NaiveBayes20NewsgroupDataPreparer.scala package com.mycompany.mia.classify import java.io.File import scala.io.Source import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.io.{Text, SequenceFile} object NaiveBayes20NewsgroupDataPreparer extends App { val conf = new Configuration() val fs = FileSystem.get(conf) val path = new Path(args(1)) val writer = new SequenceFile.Writer(fs, conf, path, classOf[Text], classOf[Text]) val dirs = new File(args(0)).listFiles() var n = 0 for (dir <- dirs) { val label = dir.getName() for (file <- dir.listFiles()) { val text = Source.fromFile(file). getLines(). foldLeft("") (_ + " " + _) // extra slash added to key to get around AAOOB thrown // by BayesUtils.writeLabelIndex writer.append(new Text("/" + label), new Text(text)) n += 1 } println(label + ": " + n + " files loaded") } writer.close() // self-test to see that everything loaded okay... val reader = new SequenceFile.Reader(fs, path, conf) val key = new Text() val value = new Text() var rec = 0 while (reader.next(key, value)) { if (rec < 10) { println(key.toString() + " => " + value.toString()) } rec += 1 } println("...") println("#=records written: " + rec) reader.close() } ```

The following sequence of scripts prepares the data from the input files, trains a Naive Bayes classifier and runs it against the test set. Running the complementary Naive Bayes just involves passing an extra parameter to the trainnb and testnb subcommands.

 ``` 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18``` ```sujit@cyclone:mia-scala-examples\$ sbt 'run-main \ com.mycompany.mia.classify.NaiveBayes20NewsgroupDataPreparer \ /path/to/20news-bydate-train /tmp/20news-seq' hduser@cyclone:mahout\$ hadoop fs -put /tmp/20news-seq . hduser@cyclone:mahout\$ bin/mahout seq2sparse \ -i 20news-seq -o 20news-vectors -lnorm -nv -wt tfidf hduser@cyclone:mahout\$ bin/mahout split \ -i 20news-vectors/tfidf-vectors \ --trainingOutput 20news-train-vectors \ --testOutput 20news-test-vectors \ --randomSelectionPct 20 \ --overwrite --sequenceFiles -xm sequential hduser@cyclone:mahout\$ bin/mahout trainnb \ -i 20news-train-vectors -el -o model -li labelindex -ow hduser@cyclone:mahout\$ bin/mahout testnb \ -i 20news-train-vectors \ -o 20news-testing \ -m model -l labelindex -ow ```

To improve the output of the classifier, one should investigate target leaks (resulting to "too good" results), broken feature extraction (results in bad results), eliminating, adding and combining features (eg, brand, gender, brand+gender, etc), normalizing feature dimensions, trying out different algorithms, etc.

Mahout is optimized for situations where you get lots of training examples (internet scale, typically generated by user clicks, etc), rather than the much smaller training sets created by domain experts in more traditional settings. More than the other parts, the classification code base seems to be more fluid and under heavier development, so it may make sense to look in the Mahout source code or the Mahout Newsgroup for answers rather than depending on the book.

#### 16 comments (moderated to prevent spam):

hi sujit,

can you please tell me how to use the model created from mahout(0.7) after this command:-
bin/mahout trainnb \
-i 20news-train-vectors -el -o model -li labelindex -ow

Rajesh Nikam said...

I am also trying to build model with mahout sgd. similar to your results

AUC = 0.57
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.4, -0.3], [-1.2, -0.7]]

I think it means all instances are classified as category 1: 27 + 13.

Is such kind of model useful.

Sujit Pal said...

Hi Rajesh, yes you are right, the model is useless, its not predicting anything, its like a broken clock thats right twice a day :-). It classifies everything as category 1, 27 of which are right and 13 of which are wrong which gives it the AUC of 0.57. Based on a thread on the Mahout ML that I read couple of weeks ago, I believe this is because the model's feature vectors haven't been created correctly. I suspect that this is probably a consequence of some change between Mahout's version for the MIA book and 0.7, perhaps a parameter that no longer provides an appropriate default or something.

Sujit Pal said...

@Priyadarshan: sorry about the delay in respondng, looks like I missed your comment. Once you train the model, you would write some code that would load up the model and use it to predict classes for unseen cases, similar to the SGD20NewsgroupsClassifier.test() method.

Mahout Newbie said...

Hi Sujit,

http://stackoverflow.com/questions/14151877/error-while-creating-mahout-model

Sujit Pal said...

Hi, its difficult to do without more context. I looked at the line in BayesUtils and the problem is that the code depends on a certain format for the label and not finding it, but that does not tie it back to why its failing for you. Perhaps look at the code being pointed by the /entire/ stack trace (not the last 2 lines), that may provide you a better answer.

Unknown said...

Hi Sujit, I am trying to run logistic regression in mahout. I am getting the following error message though I have the /user/as7784/Pharma folder in Hadoop. Appreciate your help.

\$ mahout trainlogistic --input /user/as7784/Pharma --output ./model --target MOM_POPS --categories 2 --predictors Devaluation Seasonality_YE P_Debt --types numeric

MAHOUT-JOB: /gridapps/mahout/mahout-examples-0.8-job.jar

Exception in thread "main" java.io.FileNotFoundException: /user/as7784/Pharma (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:106)
at org.apache.mahout.classifier.sgd.TrainLogistic.open(TrainLogistic.java:315)
at org.apache.mahout.classifier.sgd.TrainLogistic.mainToOutput(TrainLogistic.java:76)
at org.apache.mahout.classifier.sgd.TrainLogistic.main(TrainLogistic.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)

Sujit Pal said...

Hi Anitha, I suspect that /user/as7784/Pharma is a local (non HDFS) directory but your mahout script expects to see it in HDFS (or vice versa). Take a look at the MAHOUT_LOCAL environment variable (comments inside the bin/mahout script) - if you set it to some value, it forces mahout to work off the local file system.

VRK said...

Hi, Sujit, can you send me the complete code for classifying 20newsgroup dataset using SGD and NB. I need from pre processing to classification of files. Its urgent.
my mail: vrk_nitw@yahoo.com

Sujit Pal said...

Hi VRK, you can find it on GitHub here.

Rajarshi Roy said...

Hi Sujit,
I have been following your blog for last one month and I find it very informative.I am rather new to Data Mining. I wanted to ask you this question regarding clustering of documents.
Mahout or Carrot2 does and excellent job with clustering my documents. Now, I want to make this process incremental. Let's say I have a set of document S1 and by the time I finish processing them using a batch job/Map Reduce, I have another set already waiting in the queue, say S2. Clustering has to be done on the whole now, i.e. S1+S2. I can very well do that. But, is there some way where I can make this process incremental, so that, next time I process only S2 and still get the cetroid for S1 and S2 ? It does not really have to be realtime. But it should be incremental, otherwise, the data-set will continuously go on increasing and at one point of time, my processing resources will not be able to give me a result within a desired time-interval. Some guidance here will be very much helpful

Sujit Pal said...

Hi Rajarshi, thanks for the kind words, glad you find it helpful. To do incremental clustering, assuming a k-means like environment, you could maintain the centroids of your clusters (basically a averaged term vector of all documents in the cluster, I believe Mahout k-means dumps this information out already) and then assign an incoming document based on which centroid it is closest to. You could update the centroids after each batch, or periodically correct the centroids with a batch run over the full dataset.

rachana said...

hello any way to use confusion matrix in program...can i get source code

Sujit Pal said...

Hi Rachana, the confusion matrix is the output of the classification evaluation, and tells how well the classifier performed across different classes. Some classifiers take the classification flags made by upstream (less accurate) classifiers, perhaps in that case, we could also use the probability of a correct answer for a class from the confusion matrix as a feature? But in the case described, that would be incorrect and may well lead to overfitting. If you are looking for source code for the code I describe in this blog post, its on github here.

chiru said...

please anyone send the algorithm in mahout classification which already implemented (naive)

Sujit Pal said...

Hi Chiru, I believe Mahout still ships with the Naive Bayes Classifier - here is a List of supported algorithms in Mahout 0.10 according to the Mahout website.