Sunday, September 09, 2012

Learning Mahout : Classification

The final part covered in the MIA book is Classification. The popular algorithms available are Stochastic Gradient Descent (SGD), Naive Bayes and Complementary Naive Bayes, Random Forests and Online Passive Aggressive. There are other algorithms in the pipeline, as seen from the Classification section of the Mahout wiki page.

The MIA book has generic classification information and advice that will be useful for any algorithm, but it specifically covers SGD, Bayes and Naive Bayes (the last two via Mahout scripts). Of these SGD and Random Forest are good for classification problems involving continuous variables and small to medium datasets, and the Naive Bayes family is good for problems involving text like variables and medium to large datasets.

In general, a solution to a classification problem involves choosing the appropriate features for classification, choosing the algorithm, generating the feature vectors (vectorization), training the model and evaluating the results in a loop. You continue to tweak stuff in each of these steps until you get the results with the desired accuracy.

If training data is provided (or if it can be reduced to) tabular form, then we can use Mahout subcommands to generate a logistic regression (SGD) model and test it, as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
hduser@cyclone:mahout$ bin/mahout trainlogistic \
  --input /tmp/donut.csv \
  --output /tmp/model \
  --target color \               # target variable
  --categories 2 \               # number of categories
  --predictors x y \             # predictor variables
  --types numeric \              # predictor variable types
  --features 20 \                # size of internal feature vector
  --passes 100 \                 # number of passes over input
  --rate 50                      # initial learning rate
hduser@cyclone:mahout$ bin/mahout runlogistic \
  --input /tmp/test-donut.csv \
  --model /tmp/model \
  --auc \                 # report area under curve
  --confusion             # report confusion matrix
AUC = 0.57
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.4, -0.3], [-1.2, -0.7]]
...

The AUC (area under the curve) indicates the number of correct classifications, so 0 indicates a classifier thats always wrong and 1 indicates one thats always correct. The confusion matrix indicates the expected vs actual classification numbers (so for a good classifier, we should expect the numbers along the diagonal to predominate). For SGD, AUC and Log Likelihood are good measures. For NB and CNB, the percent correct and confusion matrix are good measures.

Most classification problems involve a mix of continuous, categorical, word like and text-like features. The input to a (Mahout) classification algorithm is in the form of vectors. Vectorizing approaches can be one cell/word, bag of words, and feature hashing (similar in concept to Bloom filters).

The data set used for most of the examples in the MIA book is the 20 Newsgroups DataSet. The data is a bunch of newsgroup postings, so apart from the text body, candidates for features could be their header metadata.

So the first step is to see what headers are available and how many there are. The following Unix command will list the headers and their counts. Based on the analysis, the decision is made to use the Subject, From, Keywords and Summary as additional metadata to assist in classification.

1
2
3
4
sujit@cyclone:20news-bydate-train$ export LC_ALL='C'; \
  for file in */*; do \
    sed -E -e '/^$/,$d' -e 's/:.*//' -e '/^[[:space:]]/d' $file; \
  done | sort | uniq -c | sort -nr

The following code uses the AdaptiveLogisticRegression algorithm (which runs multiple SGD algorithms and automatically chooses the best one) to classify the 20 Newsgroups training set, then test the algorithm with the 20 Newsgroups test set. The code demonstrates the building of feature vectors for each document using multiple hashing encoders.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Source: src/main/scala/com/mycompany/mia/classify/SGD20NewsgroupsClassifier.scala
package com.mycompany.mia.classify

import java.io.{StringReader, PrintWriter, FileInputStream, File}
import java.util.{HashMap, Collections, ArrayList}

import scala.collection.JavaConversions.{collectionAsScalaIterable, asScalaBuffer}
import scala.io.Source

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.Version
import org.apache.mahout.classifier.sgd.OnlineLogisticRegression
import org.apache.mahout.classifier.sgd.{ModelSerializer, L1, AdaptiveLogisticRegression}
import org.apache.mahout.common.RandomUtils
import org.apache.mahout.math.{Vector, RandomAccessSparseVector}
import org.apache.mahout.vectorizer.encoders.{TextValueEncoder, Dictionary, ConstantValueEncoder}

import com.google.common.collect.ConcurrentHashMultiset

object SGD20NewsgroupsClassifier extends App {

  val features = 10000
  val analyzer = new StandardAnalyzer(Version.LUCENE_32)
  val encoder = new TextValueEncoder("body")
  encoder.setProbes(2)
  val lines = new ConstantValueEncoder("line")
  val loglines = new ConstantValueEncoder("log(line)")
  val bias = new ConstantValueEncoder("intercept")
  val rand = RandomUtils.getRandom()

  // Usage: either
  // SGD20NewsgroupsClassifier train input_dir model_file dict_file, or
  // SGD20NewsgroupsClassifier test model_file dict_file test_dir
  args(0) match {
    case "train" => train(args(1), args(2), args(3))
    case "test" => test(args(1), args(2), args(3))
  }
  
  def train(trainDir : String, 
      modelFile : String, 
      dictFile : String) : Unit = {
    val newsgroups = new Dictionary()
    val learningAlgorithm = new AdaptiveLogisticRegression(
      20, features, new L1())
    learningAlgorithm.setInterval(800)
    learningAlgorithm.setAveragingWindow(500)
    // prepare data
    val files = new ArrayList[File]()
    val dirs = new File(trainDir).listFiles()
    for (dir <- dirs) {
      if (dir.isDirectory()) {
        newsgroups.intern(dir.getName())
        for (file <- dir.listFiles()) {
          files.add(file)
        }
      }
    }
    Collections.shuffle(files)
    println(files.size() + " training files in " + dirs.length + " classes")
  
    var k = 0
    var step = 0D
    var bumps = Array(1, 2, 5)

    for (file <- files) {
      val ng = file.getParentFile().getName()
      val actualClass = newsgroups.intern(ng)
      val vector = encodeFeatureVector(file)
      learningAlgorithm.train(actualClass, vector)
    }
    learningAlgorithm.close()
    // evaluate model
    val learner = learningAlgorithm.getBest().getPayload().getLearner()
    println("AUC=" + learner.auc() + ", %-correct=" + learner.percentCorrect())
    ModelSerializer.writeBinary(modelFile, learner.getModels().get(0))
    val serializedDict = new PrintWriter(dictFile)
    for (newsgroup <- newsgroups.values()) {
      serializedDict.println(newsgroup)
    }
    serializedDict.flush()
    serializedDict.close()
  }
  
  def encodeFeatureVector(file : File) : Vector = {
    
    val vector = new RandomAccessSparseVector(features)
    val words : ConcurrentHashMultiset[String] = 
      ConcurrentHashMultiset.create()
    var numlines = 0
    var startBody = false
    var prevLine = ""
    for (line <- Source.fromFile(file).getLines()) {
      if (line.startsWith("From:") ||
          line.startsWith("Subject:") ||
          line.startsWith("Keywords:") ||
          line.startsWith("Summary:")) {
        countWords(line.replaceAll(".*:", ""), words)
      }
      if (! startBody &&
          line.trim().length() == 0 &&
          prevLine.trim().length() == 0) {
        startBody = true
      }
      if (startBody) {
        countWords(line, words)
      }
      numlines += 1
      prevLine = line
    }
    bias.addToVector(null, 1, vector)
    lines.addToVector(null, numlines / 30, vector)
    loglines.addToVector(null, Math.log(numlines + 1), vector)
    for (word <- words) {
      encoder.addToVector(word, Math.log(1 + words.count(word)), vector)
    }
    vector
  }
  
  def countWords(line : String,
      words : ConcurrentHashMultiset[String]) : Unit = {
    val words = new ArrayList[String]()
    val tokenStream = analyzer.tokenStream("text", new StringReader(line))
    tokenStream.addAttribute(classOf[CharTermAttribute])
    while (tokenStream.incrementToken()) {
      val attr = tokenStream.getAttribute(classOf[CharTermAttribute])
      words.add(new String(attr.buffer(), 0, attr.length()))
    }
  }
  
  def test(modelFile : String, 
      dictFile : String, 
      testDir : String) : Unit = {
    
    val model = ModelSerializer.readBinary(
      new FileInputStream(modelFile), 
      classOf[OnlineLogisticRegression])
    val newsgroups = getNewsgroups(dictFile)
    
    val dirs = new File(testDir).listFiles()
    var ncorrect = 0
    var ntotal = 0
    for (dir <- dirs) {
      if (dir.isDirectory()) {
        val expectedLabel = dir.getName()
        for (file <- dir.listFiles()) {
          val vector = encodeFeatureVector(file)
          val results = model.classify(vector)
          val actualLabel = newsgroups.get(results.maxValueIndex())
          println("file: " + file.getName() + 
            ", expected: " + expectedLabel + 
            ", actual: " + actualLabel)
          if (actualLabel.equals(expectedLabel)) {
            ncorrect += 1
          }
          ntotal += 1
        }
      }
    }
    println("Correct: " + ncorrect + "/" + ntotal)
  }
  
  def getNewsgroups(dictFile : String) : HashMap[Integer,String] = {
    val newsgroups = new HashMap[Integer,String]()
    var lno = 0
    for (line <- Source.fromFile(dictFile).getLines()) {
      newsgroups.put(lno, line)
      lno += 1
    }
    newsgroups
  }
}

To train and test it, we use the following commands. The accuracies are nothing to write home about (AUC=0.5), but at this time I am more concerned with the mechanics of getting a classifier working than the results.

1
2
3
4
5
6
7
8
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.SGD20NewsgroupsClassifier train \
  /path/to/20news-bydate-train \
  /path/to/model.file /path/to/dict.file'
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.SGD20NewsgroupsClassifier test \
  /path/to/model.file /path/to/dict.file \
  /path/to/20news-bydate-test'

As you can see, the above is not run on Hadoop. This is because SGD is a sequential algorithm, so there is no point to parallelizing it. The Naive Bayes family, on the other hand, does benefit from parallelism, so it is run on Hadoop. The book covers running via scripts, so thats what I did, figuring I will go back and check out the code later (the class names are available in "driver.classes.props" in the Mahout source) if and when I need to build integrated solutions.

The book specifies the prepare20newsgroups subcommand, but that is deprecated in the current Mahout distribution, and they have a shell script to run both the Naive Bayes and SGD versions. I believe the part where the training files are converted to a 1 file per line sequence file is incorrect, so I wrote my own code to do this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Source: src/main/scala/com/mycompany/mia/classify/NaiveBayes20NewsgroupDataPreparer.scala
package com.mycompany.mia.classify

import java.io.File

import scala.io.Source

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{Text, SequenceFile}

object NaiveBayes20NewsgroupDataPreparer extends App {

  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  val path = new Path(args(1))
  val writer = new SequenceFile.Writer(fs, conf, path, 
    classOf[Text], classOf[Text])
  val dirs = new File(args(0)).listFiles()
  var n = 0
  for (dir <- dirs) {
    val label = dir.getName()
    for (file <- dir.listFiles()) {
      val text = Source.fromFile(file).
        getLines().
        foldLeft("") (_ + " " + _)
      // extra slash added to key to get around AAOOB thrown
      // by BayesUtils.writeLabelIndex
      writer.append(new Text("/" + label), new Text(text))
      n += 1
    }
    println(label + ": " + n + " files loaded")
  }
  writer.close()
  
  // self-test to see that everything loaded okay...
  val reader = new SequenceFile.Reader(fs, path, conf)
  val key = new Text()
  val value = new Text()
  var rec = 0
  while (reader.next(key, value)) {
    if (rec < 10) {
      println(key.toString() + " => " + value.toString())
    }
    rec += 1
  }
  println("...")
  println("#=records written: " + rec)
  reader.close()
}

The following sequence of scripts prepares the data from the input files, trains a Naive Bayes classifier and runs it against the test set. Running the complementary Naive Bayes just involves passing an extra parameter to the trainnb and testnb subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.classify.NaiveBayes20NewsgroupDataPreparer \
  /path/to/20news-bydate-train /tmp/20news-seq'
hduser@cyclone:mahout$ hadoop fs -put /tmp/20news-seq .
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i 20news-seq -o 20news-vectors -lnorm -nv -wt tfidf
hduser@cyclone:mahout$ bin/mahout split \
  -i 20news-vectors/tfidf-vectors \
  --trainingOutput 20news-train-vectors \
  --testOutput 20news-test-vectors \
  --randomSelectionPct 20 \
  --overwrite --sequenceFiles -xm sequential
hduser@cyclone:mahout$ bin/mahout trainnb \
  -i 20news-train-vectors -el -o model -li labelindex -ow 
hduser@cyclone:mahout$ bin/mahout testnb \
  -i 20news-train-vectors \
  -o 20news-testing \
  -m model -l labelindex -ow 

To improve the output of the classifier, one should investigate target leaks (resulting to "too good" results), broken feature extraction (results in bad results), eliminating, adding and combining features (eg, brand, gender, brand+gender, etc), normalizing feature dimensions, trying out different algorithms, etc.

Mahout is optimized for situations where you get lots of training examples (internet scale, typically generated by user clicks, etc), rather than the much smaller training sets created by domain experts in more traditional settings. More than the other parts, the classification code base seems to be more fluid and under heavier development, so it may make sense to look in the Mahout source code or the Mahout Newsgroup for answers rather than depending on the book.

16 comments (moderated to prevent spam):

Priyadarshan raj said...

hi sujit,

can you please tell me how to use the model created from mahout(0.7) after this command:-
bin/mahout trainnb \
-i 20news-train-vectors -el -o model -li labelindex -ow

Rajesh Nikam said...

I am also trying to build model with mahout sgd. similar to your results

AUC = 0.57
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.4, -0.3], [-1.2, -0.7]]

Please closely see confusion matrix.
I think it means all instances are classified as category 1: 27 + 13.

Is such kind of model useful.

Sujit Pal said...

Hi Rajesh, yes you are right, the model is useless, its not predicting anything, its like a broken clock thats right twice a day :-). It classifies everything as category 1, 27 of which are right and 13 of which are wrong which gives it the AUC of 0.57. Based on a thread on the Mahout ML that I read couple of weeks ago, I believe this is because the model's feature vectors haven't been created correctly. I suspect that this is probably a consequence of some change between Mahout's version for the MIA book and 0.7, perhaps a parameter that no longer provides an appropriate default or something.

Sujit Pal said...

@Priyadarshan: sorry about the delay in respondng, looks like I missed your comment. Once you train the model, you would write some code that would load up the model and use it to predict classes for unseen cases, similar to the SGD20NewsgroupsClassifier.test() method.

Mahout Newbie said...

Hi Sujit,

Can you please help to solve this issue,

http://stackoverflow.com/questions/14151877/error-while-creating-mahout-model

Sujit Pal said...

Hi, its difficult to do without more context. I looked at the line in BayesUtils and the problem is that the code depends on a certain format for the label and not finding it, but that does not tie it back to why its failing for you. Perhaps look at the code being pointed by the /entire/ stack trace (not the last 2 lines), that may provide you a better answer.

Unknown said...

Hi Sujit, I am trying to run logistic regression in mahout. I am getting the following error message though I have the /user/as7784/Pharma folder in Hadoop. Appreciate your help.

$ mahout trainlogistic --input /user/as7784/Pharma --output ./model --target MOM_POPS --categories 2 --predictors Devaluation Seasonality_YE P_Debt --types numeric
Warning: $HADOOP_HOME is deprecated.

Running on hadoop, using /gridapps/hadoop/bin/hadoop and HADOOP_CONF_DIR=
MAHOUT-JOB: /gridapps/mahout/mahout-examples-0.8-job.jar
Warning: $HADOOP_HOME is deprecated.

Exception in thread "main" java.io.FileNotFoundException: /user/as7784/Pharma (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:106)
at org.apache.mahout.classifier.sgd.TrainLogistic.open(TrainLogistic.java:315)
at org.apache.mahout.classifier.sgd.TrainLogistic.mainToOutput(TrainLogistic.java:76)
at org.apache.mahout.classifier.sgd.TrainLogistic.main(TrainLogistic.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Sujit Pal said...

Hi Anitha, I suspect that /user/as7784/Pharma is a local (non HDFS) directory but your mahout script expects to see it in HDFS (or vice versa). Take a look at the MAHOUT_LOCAL environment variable (comments inside the bin/mahout script) - if you set it to some value, it forces mahout to work off the local file system.

VRK said...

Hi, Sujit, can you send me the complete code for classifying 20newsgroup dataset using SGD and NB. I need from pre processing to classification of files. Its urgent.
my mail: vrk_nitw@yahoo.com

Sujit Pal said...

Hi VRK, you can find it on GitHub here.

Rajarshi Roy said...

Hi Sujit,
I have been following your blog for last one month and I find it very informative.I am rather new to Data Mining. I wanted to ask you this question regarding clustering of documents.
Mahout or Carrot2 does and excellent job with clustering my documents. Now, I want to make this process incremental. Let's say I have a set of document S1 and by the time I finish processing them using a batch job/Map Reduce, I have another set already waiting in the queue, say S2. Clustering has to be done on the whole now, i.e. S1+S2. I can very well do that. But, is there some way where I can make this process incremental, so that, next time I process only S2 and still get the cetroid for S1 and S2 ? It does not really have to be realtime. But it should be incremental, otherwise, the data-set will continuously go on increasing and at one point of time, my processing resources will not be able to give me a result within a desired time-interval. Some guidance here will be very much helpful

Sujit Pal said...

Hi Rajarshi, thanks for the kind words, glad you find it helpful. To do incremental clustering, assuming a k-means like environment, you could maintain the centroids of your clusters (basically a averaged term vector of all documents in the cluster, I believe Mahout k-means dumps this information out already) and then assign an incoming document based on which centroid it is closest to. You could update the centroids after each batch, or periodically correct the centroids with a batch run over the full dataset.

rachana said...

hello any way to use confusion matrix in program...can i get source code

Sujit Pal said...

Hi Rachana, the confusion matrix is the output of the classification evaluation, and tells how well the classifier performed across different classes. Some classifiers take the classification flags made by upstream (less accurate) classifiers, perhaps in that case, we could also use the probability of a correct answer for a class from the confusion matrix as a feature? But in the case described, that would be incorrect and may well lead to overfitting. If you are looking for source code for the code I describe in this blog post, its on github here.

chiru said...

please anyone send the algorithm in mahout classification which already implemented (naive)

Sujit Pal said...

Hi Chiru, I believe Mahout still ships with the Naive Bayes Classifier - here is a List of supported algorithms in Mahout 0.10 according to the Mahout website.