The final part covered in the MIA book is Classification. The popular algorithms available are Stochastic Gradient Descent (SGD), Naive Bayes and Complementary Naive Bayes, Random Forests and Online Passive Aggressive. There are other algorithms in the pipeline, as seen from the Classification section of the Mahout wiki page.
The MIA book has generic classification information and advice that will be useful for any algorithm, but it specifically covers SGD, Bayes and Naive Bayes (the last two via Mahout scripts). Of these SGD and Random Forest are good for classification problems involving continuous variables and small to medium datasets, and the Naive Bayes family is good for problems involving text like variables and medium to large datasets.
In general, a solution to a classification problem involves choosing the appropriate features for classification, choosing the algorithm, generating the feature vectors (vectorization), training the model and evaluating the results in a loop. You continue to tweak stuff in each of these steps until you get the results with the desired accuracy.
If training data is provided (or if it can be reduced to) tabular form, then we can use Mahout subcommands to generate a logistic regression (SGD) model and test it, as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | hduser@cyclone:mahout$ bin/mahout trainlogistic \
--input /tmp/donut.csv \
--output /tmp/model \
--target color \ # target variable
--categories 2 \ # number of categories
--predictors x y \ # predictor variables
--types numeric \ # predictor variable types
--features 20 \ # size of internal feature vector
--passes 100 \ # number of passes over input
--rate 50 # initial learning rate
hduser@cyclone:mahout$ bin/mahout runlogistic \
--input /tmp/test-donut.csv \
--model /tmp/model \
--auc \ # report area under curve
--confusion # report confusion matrix
AUC = 0.57
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.4, -0.3], [-1.2, -0.7]]
...
|
The AUC (area under the curve) indicates the number of correct classifications, so 0 indicates a classifier thats always wrong and 1 indicates one thats always correct. The confusion matrix indicates the expected vs actual classification numbers (so for a good classifier, we should expect the numbers along the diagonal to predominate). For SGD, AUC and Log Likelihood are good measures. For NB and CNB, the percent correct and confusion matrix are good measures.
Most classification problems involve a mix of continuous, categorical, word like and text-like features. The input to a (Mahout) classification algorithm is in the form of vectors. Vectorizing approaches can be one cell/word, bag of words, and feature hashing (similar in concept to Bloom filters).
The data set used for most of the examples in the MIA book is the 20 Newsgroups DataSet. The data is a bunch of newsgroup postings, so apart from the text body, candidates for features could be their header metadata.
So the first step is to see what headers are available and how many there are. The following Unix command will list the headers and their counts. Based on the analysis, the decision is made to use the Subject, From, Keywords and Summary as additional metadata to assist in classification.
1 2 3 4 | sujit@cyclone:20news-bydate-train$ export LC_ALL='C'; \
for file in */*; do \
sed -E -e '/^$/,$d' -e 's/:.*//' -e '/^[[:space:]]/d' $file; \
done | sort | uniq -c | sort -nr
|
The following code uses the AdaptiveLogisticRegression algorithm (which runs multiple SGD algorithms and automatically chooses the best one) to classify the 20 Newsgroups training set, then test the algorithm with the 20 Newsgroups test set. The code demonstrates the building of feature vectors for each document using multiple hashing encoders.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | // Source: src/main/scala/com/mycompany/mia/classify/SGD20NewsgroupsClassifier.scala
package com.mycompany.mia.classify
import java.io.{StringReader, PrintWriter, FileInputStream, File}
import java.util.{HashMap, Collections, ArrayList}
import scala.collection.JavaConversions.{collectionAsScalaIterable, asScalaBuffer}
import scala.io.Source
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.Version
import org.apache.mahout.classifier.sgd.OnlineLogisticRegression
import org.apache.mahout.classifier.sgd.{ModelSerializer, L1, AdaptiveLogisticRegression}
import org.apache.mahout.common.RandomUtils
import org.apache.mahout.math.{Vector, RandomAccessSparseVector}
import org.apache.mahout.vectorizer.encoders.{TextValueEncoder, Dictionary, ConstantValueEncoder}
import com.google.common.collect.ConcurrentHashMultiset
object SGD20NewsgroupsClassifier extends App {
val features = 10000
val analyzer = new StandardAnalyzer(Version.LUCENE_32)
val encoder = new TextValueEncoder("body")
encoder.setProbes(2)
val lines = new ConstantValueEncoder("line")
val loglines = new ConstantValueEncoder("log(line)")
val bias = new ConstantValueEncoder("intercept")
val rand = RandomUtils.getRandom()
// Usage: either
// SGD20NewsgroupsClassifier train input_dir model_file dict_file, or
// SGD20NewsgroupsClassifier test model_file dict_file test_dir
args(0) match {
case "train" => train(args(1), args(2), args(3))
case "test" => test(args(1), args(2), args(3))
}
def train(trainDir : String,
modelFile : String,
dictFile : String) : Unit = {
val newsgroups = new Dictionary()
val learningAlgorithm = new AdaptiveLogisticRegression(
20, features, new L1())
learningAlgorithm.setInterval(800)
learningAlgorithm.setAveragingWindow(500)
// prepare data
val files = new ArrayList[File]()
val dirs = new File(trainDir).listFiles()
for (dir <- dirs) {
if (dir.isDirectory()) {
newsgroups.intern(dir.getName())
for (file <- dir.listFiles()) {
files.add(file)
}
}
}
Collections.shuffle(files)
println(files.size() + " training files in " + dirs.length + " classes")
var k = 0
var step = 0D
var bumps = Array(1, 2, 5)
for (file <- files) {
val ng = file.getParentFile().getName()
val actualClass = newsgroups.intern(ng)
val vector = encodeFeatureVector(file)
learningAlgorithm.train(actualClass, vector)
}
learningAlgorithm.close()
// evaluate model
val learner = learningAlgorithm.getBest().getPayload().getLearner()
println("AUC=" + learner.auc() + ", %-correct=" + learner.percentCorrect())
ModelSerializer.writeBinary(modelFile, learner.getModels().get(0))
val serializedDict = new PrintWriter(dictFile)
for (newsgroup <- newsgroups.values()) {
serializedDict.println(newsgroup)
}
serializedDict.flush()
serializedDict.close()
}
def encodeFeatureVector(file : File) : Vector = {
val vector = new RandomAccessSparseVector(features)
val words : ConcurrentHashMultiset[String] =
ConcurrentHashMultiset.create()
var numlines = 0
var startBody = false
var prevLine = ""
for (line <- Source.fromFile(file).getLines()) {
if (line.startsWith("From:") ||
line.startsWith("Subject:") ||
line.startsWith("Keywords:") ||
line.startsWith("Summary:")) {
countWords(line.replaceAll(".*:", ""), words)
}
if (! startBody &&
line.trim().length() == 0 &&
prevLine.trim().length() == 0) {
startBody = true
}
if (startBody) {
countWords(line, words)
}
numlines += 1
prevLine = line
}
bias.addToVector(null, 1, vector)
lines.addToVector(null, numlines / 30, vector)
loglines.addToVector(null, Math.log(numlines + 1), vector)
for (word <- words) {
encoder.addToVector(word, Math.log(1 + words.count(word)), vector)
}
vector
}
def countWords(line : String,
words : ConcurrentHashMultiset[String]) : Unit = {
val words = new ArrayList[String]()
val tokenStream = analyzer.tokenStream("text", new StringReader(line))
tokenStream.addAttribute(classOf[CharTermAttribute])
while (tokenStream.incrementToken()) {
val attr = tokenStream.getAttribute(classOf[CharTermAttribute])
words.add(new String(attr.buffer(), 0, attr.length()))
}
}
def test(modelFile : String,
dictFile : String,
testDir : String) : Unit = {
val model = ModelSerializer.readBinary(
new FileInputStream(modelFile),
classOf[OnlineLogisticRegression])
val newsgroups = getNewsgroups(dictFile)
val dirs = new File(testDir).listFiles()
var ncorrect = 0
var ntotal = 0
for (dir <- dirs) {
if (dir.isDirectory()) {
val expectedLabel = dir.getName()
for (file <- dir.listFiles()) {
val vector = encodeFeatureVector(file)
val results = model.classify(vector)
val actualLabel = newsgroups.get(results.maxValueIndex())
println("file: " + file.getName() +
", expected: " + expectedLabel +
", actual: " + actualLabel)
if (actualLabel.equals(expectedLabel)) {
ncorrect += 1
}
ntotal += 1
}
}
}
println("Correct: " + ncorrect + "/" + ntotal)
}
def getNewsgroups(dictFile : String) : HashMap[Integer,String] = {
val newsgroups = new HashMap[Integer,String]()
var lno = 0
for (line <- Source.fromFile(dictFile).getLines()) {
newsgroups.put(lno, line)
lno += 1
}
newsgroups
}
}
|
To train and test it, we use the following commands. The accuracies are nothing to write home about (AUC=0.5), but at this time I am more concerned with the mechanics of getting a classifier working than the results.
1 2 3 4 5 6 7 8 | sujit@cyclone:mia-scala-examples$ sbt 'run-main \
com.mycompany.mia.classify.SGD20NewsgroupsClassifier train \
/path/to/20news-bydate-train \
/path/to/model.file /path/to/dict.file'
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
com.mycompany.mia.classify.SGD20NewsgroupsClassifier test \
/path/to/model.file /path/to/dict.file \
/path/to/20news-bydate-test'
|
As you can see, the above is not run on Hadoop. This is because SGD is a sequential algorithm, so there is no point to parallelizing it. The Naive Bayes family, on the other hand, does benefit from parallelism, so it is run on Hadoop. The book covers running via scripts, so thats what I did, figuring I will go back and check out the code later (the class names are available in "driver.classes.props" in the Mahout source) if and when I need to build integrated solutions.
The book specifies the prepare20newsgroups subcommand, but that is deprecated in the current Mahout distribution, and they have a shell script to run both the Naive Bayes and SGD versions. I believe the part where the training files are converted to a 1 file per line sequence file is incorrect, so I wrote my own code to do this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | // Source: src/main/scala/com/mycompany/mia/classify/NaiveBayes20NewsgroupDataPreparer.scala
package com.mycompany.mia.classify
import java.io.File
import scala.io.Source
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{Text, SequenceFile}
object NaiveBayes20NewsgroupDataPreparer extends App {
val conf = new Configuration()
val fs = FileSystem.get(conf)
val path = new Path(args(1))
val writer = new SequenceFile.Writer(fs, conf, path,
classOf[Text], classOf[Text])
val dirs = new File(args(0)).listFiles()
var n = 0
for (dir <- dirs) {
val label = dir.getName()
for (file <- dir.listFiles()) {
val text = Source.fromFile(file).
getLines().
foldLeft("") (_ + " " + _)
// extra slash added to key to get around AAOOB thrown
// by BayesUtils.writeLabelIndex
writer.append(new Text("/" + label), new Text(text))
n += 1
}
println(label + ": " + n + " files loaded")
}
writer.close()
// self-test to see that everything loaded okay...
val reader = new SequenceFile.Reader(fs, path, conf)
val key = new Text()
val value = new Text()
var rec = 0
while (reader.next(key, value)) {
if (rec < 10) {
println(key.toString() + " => " + value.toString())
}
rec += 1
}
println("...")
println("#=records written: " + rec)
reader.close()
}
|
The following sequence of scripts prepares the data from the input files, trains a Naive Bayes classifier and runs it against the test set. Running the complementary Naive Bayes just involves passing an extra parameter to the trainnb and testnb subcommands.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | sujit@cyclone:mia-scala-examples$ sbt 'run-main \
com.mycompany.mia.classify.NaiveBayes20NewsgroupDataPreparer \
/path/to/20news-bydate-train /tmp/20news-seq'
hduser@cyclone:mahout$ hadoop fs -put /tmp/20news-seq .
hduser@cyclone:mahout$ bin/mahout seq2sparse \
-i 20news-seq -o 20news-vectors -lnorm -nv -wt tfidf
hduser@cyclone:mahout$ bin/mahout split \
-i 20news-vectors/tfidf-vectors \
--trainingOutput 20news-train-vectors \
--testOutput 20news-test-vectors \
--randomSelectionPct 20 \
--overwrite --sequenceFiles -xm sequential
hduser@cyclone:mahout$ bin/mahout trainnb \
-i 20news-train-vectors -el -o model -li labelindex -ow
hduser@cyclone:mahout$ bin/mahout testnb \
-i 20news-train-vectors \
-o 20news-testing \
-m model -l labelindex -ow
|
To improve the output of the classifier, one should investigate target leaks (resulting to "too good" results), broken feature extraction (results in bad results), eliminating, adding and combining features (eg, brand, gender, brand+gender, etc), normalizing feature dimensions, trying out different algorithms, etc.
Mahout is optimized for situations where you get lots of training examples (internet scale, typically generated by user clicks, etc), rather than the much smaller training sets created by domain experts in more traditional settings. More than the other parts, the classification code base seems to be more fluid and under heavier development, so it may make sense to look in the Mahout source code or the Mahout Newsgroup for answers rather than depending on the book.