My Mahout in Action (MIA) book has been collecting dust for a while now, waiting for me to get around to learning about Mahout. Mahout is evolving quite rapidly, so the book is a bit dated now, but I decided to use it as a guide anyway as I work through the various modules in the currently GA) 0.7 distribution.
My objective is to learn about Mahout initially from a client perspective, ie, find out what ML modules (eg, clustering, logistic regression, etc) are available, and which algorithms are supported within each module, and how to use them from my own code. Although Mahout provides non-Hadoop implementations for almost all its features, I am primarily interested in the Hadoop implementations. Initially I just want to figure out how to use it (with custom code to tweak behavior). Later, I would like to understand how the algorithm is represented as a (possibly multi-stage) M/R job so I can build similar implementations.
I am going to write about my progress, mainly in order to populate my cheat sheet in the sky (ie, for future reference). Any code I write will be available in this GitHub (Scala) project.
The first module covered in the book is Collaborative Filtering. Essentially, it is a technique of predicting preferences given the preferences of others in the group. There are two main approaches - user based and item based. In case of user-based filtering, the objective is to look for users similar to the given user, then use the ratings from these similar users to predict a preference for the given user. In case of item-based recommendation, similarities between pairs of items are computed, then preferences predicted for the given user using a combination of the user's current item preferences and the similarity matrix.
Anatomy of a Mahout Recommender
The input to such a system is either a 3-tuple of (UserID, ItemID, Rating) or a 2-tuple of (UserID, ItemID). In the latter case, the preferences are assumed to be boolean (ie, 1 for where the (UserID, ItemID) pair exists, 0 otherwise). In Mahout, this input is represented by a DataModel class, which can be created from a file of these tuples (either CSV or TSV), one per line. Other ways to populate the DataModel also exist, for example, programatically or from database.
A user-based Recommender is built out of a DataModel, a UserNeighborhood and a UserSimilarity. A UserNeighborhood defines the concept of a group of users similar to the current user - the two available implementations are Nearest and Threshold. The nearest neighborhood consists of the nearest N users for the given user, where nearness is defined by the similarity implementation. The threshold neighborhood consists of users who are at least as similar to the given user as defined by the similarity implementation. The UserSimilarity defines the similarity between two users - implementations include EuclideanDistance, Pearson Correlation, Uncentered Cosine, Caching, City Block, Dummy, Generic User, Log Likelihood, Spearman Correlation and Tanimoto Coefficient similarity.
An item-based Recommender is built out of a DataModel and a ItemSimilarity. Implementations of ItemSimilarity include Euclidean Distance, Pearson Correlation, Uncentered Cosine, City Block, Dummy, Log Likelihood, Tanimoto Coefficient, Caching Item, File Item, and Generic Item similarity. The MIA book describes each algorithm in greater detail.
In both cases, an IDRescorer object can be used to modify the recommendations with some domain logic, either by filtering out some of the recommendations (using isFiltered(itemID : Long) : Boolean) or by boosting/deboosting the recommendation score (using rescore(itemID : Long, originalScore : Double) : Double).
Evaluating Recommenders
A Mahout user would build a Recommender using the "right" mix of the components described above. What is a right mix is not readily apparent, so we can just ask the computer. Mahout provides three evaluation metrics, the Average Absolute Difference, Root Mean Square Difference and the IR Stats (which provides precision and recall at N). Here is some some code that uses the IRStats evaluator to run a sample of the input against various combinations and reports the precision and recall at a certain point.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | package com.mycompany.mia.cf
import java.io.File
import scala.collection.mutable.HashMap
import org.apache.mahout.cf.taste.common.Weighting
import org.apache.mahout.cf.taste.eval.{RecommenderBuilder, IRStatistics}
import org.apache.mahout.cf.taste.impl.eval.GenericRecommenderIRStatsEvaluator
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.impl.model.{GenericDataModel, GenericBooleanPrefDataModel}
import org.apache.mahout.cf.taste.impl.neighborhood.{ThresholdUserNeighborhood, NearestNUserNeighborhood}
import org.apache.mahout.cf.taste.impl.recommender.{GenericUserBasedRecommender, GenericItemBasedRecommender}
import org.apache.mahout.cf.taste.impl.similarity.{UncenteredCosineSimilarity, TanimotoCoefficientSimilarity, PearsonCorrelationSimilarity, LogLikelihoodSimilarity, EuclideanDistanceSimilarity, CityBlockSimilarity}
import org.apache.mahout.cf.taste.model.DataModel
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood
import org.apache.mahout.cf.taste.recommender.Recommender
import org.apache.mahout.cf.taste.similarity.{UserSimilarity, ItemSimilarity}
import scala.io.Source
import scala.util.Random
import java.io.PrintWriter
object RecommenderEvaluator extends App {
val neighborhoods = Array("nearest", "threshold")
val similarities = Array("euclidean", "pearson", "pearson_w",
"cosine", "cosine_w", "manhattan",
"llr", "tanimoto")
val simThreshold = 0.5
val sampleFileName = "/tmp/recommender-evaluator-sample.tmp.csv"
val argmap = parseArgs(args)
val filename = if (argmap("sample_pct") != null) {
val nlines = Source.fromFile(argmap("filename")).size
val sampleSize = nlines * (argmap("sample_pct").toFloat / 100.0)
val rand = new Random(System.currentTimeMillis())
var sampleLineNos = Set[Int]()
do {
sampleLineNos += rand.nextInt(nlines)
} while (sampleLineNos.size < sampleSize)
val out = new PrintWriter(sampleFileName)
var currline = 0
for (line <- Source.fromFile(argmap("filename")).getLines) {
if (sampleLineNos.contains(currline)) {
out.println(line)
}
currline += 1
}
out.close()
sampleFileName
} else {
argmap("filename")
}
val model = argmap("bool") match {
case "false" => new GenericDataModel(
GenericDataModel.toDataMap(new FileDataModel(
new File(filename))))
case "true" => new GenericBooleanPrefDataModel(
GenericBooleanPrefDataModel.toDataMap(new FileDataModel(
new File(filename))))
case _ => throw new IllegalArgumentException(
invalidValue("bool", argmap("bool")))
}
val evaluator = new GenericRecommenderIRStatsEvaluator()
argmap("type") match {
case "user" => {
for (neighborhood <- neighborhoods;
similarity <- similarities) {
println("Processing " + neighborhood + " / " + similarity)
try {
val recommenderBuilder = userRecoBuilder(
neighborhood, similarity,
model.asInstanceOf[GenericDataModel])
val stats = evaluator.evaluate(recommenderBuilder,
null, model,
null, argmap("precision_point").toInt,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
argmap("eval_fract").toDouble)
printResult(neighborhood, similarity, stats)
} catch {
case e => {
println("Exception caught: " + e.getMessage)
}
}
}
}
case "item" => {
for (similarity <- similarities) {
println("Processing " + similarity)
try {
val recommenderBuilder = itemRecoBuilder(similarity,
model.asInstanceOf[GenericDataModel])
val stats = evaluator.evaluate(recommenderBuilder, null,
model, null, argmap("precision_point").toInt,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
argmap("eval_fract").toDouble)
printResult(null, similarity, stats)
} catch {
case e => {
println("Exception caught: " + e.getMessage)
}
}
}
}
case _ => throw new IllegalArgumentException(
invalidValue("type", argmap("type")))
}
def usage() : Unit = {
println("Usage:")
println("com.mycompany.mia.cf.RecommenderEvaluator [-key=value...]")
println("where:")
println("sample_pct=0-100 (use sample_pct of original input)")
println("type=user|item (the type of recommender to build)")
println("bool=true|false (whether to use boolean or actual preferences)")
println("precision_point=n (the precision at n desired)")
println("eval_fract=n (fraction of data to use for evaluation)")
System.exit(1)
}
def parseArgs(args : Array[String]) : HashMap[String,String] = {
val argmap = new HashMap[String,String]()
for (arg <- args) {
val nvp = arg.split("=")
argmap(nvp(0)) = nvp(1)
}
argmap
}
def invalidValue(key : String, value : String) : String = {
"Invalid value for '" + key + "': " + value
}
def itemRecoBuilder(similarity : String,
model : GenericDataModel) : RecommenderBuilder = {
val s : ItemSimilarity = similarity match {
case "euclidean" => new EuclideanDistanceSimilarity(model)
case "pearson" => new PearsonCorrelationSimilarity(model)
case "pearson_w" => new PearsonCorrelationSimilarity(
model, Weighting.WEIGHTED)
case "cosine" => new UncenteredCosineSimilarity(model)
case "cosine_w" => new UncenteredCosineSimilarity(
model, Weighting.WEIGHTED)
case "manhattan" => new CityBlockSimilarity(model)
case "llr" => new LogLikelihoodSimilarity(model)
case "tanimoto" => new TanimotoCoefficientSimilarity(model)
case _ => throw new IllegalArgumentException(
invalidValue("similarity", similarity))
}
new RecommenderBuilder() {
override def buildRecommender(model : DataModel) : Recommender = {
new GenericItemBasedRecommender(model, s)
}
}
}
def userRecoBuilder(neighborhood : String,
similarity : String,
model : GenericDataModel) : RecommenderBuilder = {
val s : UserSimilarity = similarity match {
case "euclidean" => new EuclideanDistanceSimilarity(model)
case "pearson" => new PearsonCorrelationSimilarity(model)
case "pearson_w" => new PearsonCorrelationSimilarity(
model, Weighting.WEIGHTED)
case "cosine" => new UncenteredCosineSimilarity(model)
case "cosine_w" => new UncenteredCosineSimilarity(
model, Weighting.WEIGHTED)
case "manhattan" => new CityBlockSimilarity(model)
case "llr" => new LogLikelihoodSimilarity(model)
case "tanimoto" => new TanimotoCoefficientSimilarity(model)
case _ => throw new IllegalArgumentException(
invalidValue("similarity", similarity))
}
val neighborhoodSize = if (model.getNumUsers > 10)
(model.getNumUsers / 10) else (model.getNumUsers)
val n : UserNeighborhood = neighborhood match {
case "nearest" => new NearestNUserNeighborhood(
neighborhoodSize, s, model)
case "threshold" => new ThresholdUserNeighborhood(
simThreshold, s, model)
case _ => throw new IllegalArgumentException(
invalidValue("neighborhood", neighborhood))
}
new RecommenderBuilder() {
override def buildRecommender(model : DataModel) : Recommender = {
new GenericUserBasedRecommender(model, n, s)
}
}
}
def printResult(neighborhood : String,
similarity : String,
stats : IRStatistics) : Unit = {
println(">>> " +
(if (neighborhood != null) neighborhood else "") +
"\t" + similarity +
"\t" + stats.getPrecision.toString +
"\t" + stats.getRecall.toString)
}
}
|
The command below will run the evaluator for all the item based recommenders using 10% of the MovieLens 1M rating file and report the precision and recall at 2 for each recommender. Note that some recommenders may not give you results because there is not sufficient data. This evaluator is a work in progress so I may change it to use one of the other evaluation metrics.
1 2 3 4 5 | sujit@cyclone:mia-scala-examples$ sbt 'run-main \
com.mycompany.mia.cf.RecommenderEvaluator \
filename=data/ml-ratings.dat \
type=item bool=false \
precision_point=2 sample_pct=10 eval_fract=1'
|
Running the Recommender
As you can see, Mahout provides nice building blocks to build Recommenders with. Recommenders built using this framework can be run on a single machine (local mode) or on Hadoop via the so-called pseudo-distributed RecommenderJob that splits the input file across multiple Recommender reducers. To run in the latter mode, the recommender must have a constructor that takes a DataModel. Presumably if I am building a recommender with this framework, I would want it to scale up in this manner, so it makes sense to build it as the framework requires. Here is a custom Item-based recommender that uses the PearsonCorrelation similarity. The file also contains a runner that can be used for calling/testing in local mode.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | package com.mycompany.mia.cf
import java.io.File
import java.util.List
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.mutable.HashSet
import scala.io.Source
import org.apache.mahout.cf.taste.common.Refreshable
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender
import org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity
import org.apache.mahout.cf.taste.model.DataModel
import org.apache.mahout.cf.taste.recommender.{Recommender, RecommendedItem, IDRescorer}
object MovieLensRecommenderRunner extends App {
// grab the input file name
val filename = if (args.length == 1) args(0) else "unknown"
if ("unknown".equals(filename)) {
println("Please specify input file")
System.exit(-1)
}
// train recommender
val recommender = new MovieLensRecommender(
new FileDataModel(new File(filename)))
// test recommender
val alreadySeen = new HashSet[Long]()
val lines = Source.fromFile(filename).getLines
for (line <- lines) {
val user = line.split(",")(0).toLong
if (! alreadySeen.contains(user)) {
val items = recommender.recommend(user, 100)
println(user + " =>" + items.map(x => x.getItemID).
foldLeft("")(_ + " " + _))
}
alreadySeen += user
}
}
class MovieLensRecommender(model : DataModel) extends Recommender {
val similarity = new PearsonCorrelationSimilarity(model)
val delegate = new GenericItemBasedRecommender(model, similarity)
// everything below this is boilerplate. We could use the
// RecommenderWrapper if it was part of Mahout-Core but its part
// of the Mahout-Integration for the webapp
def recommend(userID: Long, howMany: Int): List[RecommendedItem] = {
delegate.recommend(userID, howMany)
}
def recommend(userID: Long, howMany: Int, rescorer: IDRescorer): List[RecommendedItem] = {
delegate.recommend(userID, howMany, rescorer)
}
def estimatePreference(userID: Long, itemID: Long): Float = {
delegate.estimatePreference(userID, itemID)
}
def setPreference(userID: Long, itemID: Long, value: Float): Unit = {
delegate.setPreference(userID, itemID, value)
}
def removePreference(userID: Long, itemID: Long): Unit = {
delegate.removePreference(userID, itemID)
}
def getDataModel(): DataModel = {
delegate.getDataModel()
}
def refresh(alreadyRefreshed: java.util.Collection[Refreshable]): Unit = {
delegate.refresh(alreadyRefreshed)
}
}
|
We can run this in local mode on the command line using sbt as shown below:
1 2 3 4 5 6 7 | sujit@cyclone:mia-scala-examples$ sbt 'run-main \
com.mycompany.mia.cf.MovieLensRecommenderRunner data/intro.csv'
1 => 104
2 =>
3 => 103 102
4 => 102
5 =>
|
For running within the Hadoop pseudo-distributed RecommenderJob, it may be nice to have a little more data. We can use the MovieLens 1M rating file, with a few Unix transformations to make it look like a larger version of the intro.csv file. (You can also use a custom DataModel to read the format, apparently one is available in Mahout according to the MIA book).
1 2 | sujit@cyclone:data$ cat ~/Downloads/ml-ratings.dat | \
awk 'BEGIN{FS="::"; OFS=",";} {print $1, $2, $3}' > ml-ratings.dat
|
Next, you should have Hadoop set up (I have it set up to work in pseudo-distributed mode, ie a cluster of 1 (not to be confused with the Mahout pseudo-distributed RecommenderJob)).
Finally make sure that the $MAHOUT_HOME/bin/mahout script works. It runs in the context of the Mahout distribution, so it can barf if it does not find the JARs in the places it expects them to be in. To remedy this, run mvn -DskipTest install to create all the required JAR files. Then make sure that HADOOP_HOME and HADOOP_CONF_DIR enviroment variables are set to point to your Hadoop installation. Granted, the script is not very useful (see below for why not) right now, but it will come in handy when trying to use standard Mahout functionality without having to worry about pesky classpath issues. So making it work is time well-spent.
Once I got the bin/mahout script working, I discovered that it wouldn't find my custom Recommender class, even though the JAR appeared in the script's classpath. Ultimately, based on advice on this Stack Overflow page and error messages from Hadoop, I ended up un-jarring up all of mahout-core-0.7-job.jar, scala-library.jar and the classes under my project, and jarring them back together into one big fat JAR. Apparently sbt has a assembly plugin you can customize to do this, but my sbt-fu is weak to non-existent, so I used a shell script instead[1]. Once the fat jar is built, you can invoke the pseudo-distributed Recommender job like so:
1 2 3 4 5 6 7 | hduser@cyclone:mahout-distribution-0.7$ hadoop jar \
my-mahout-fatjar.jar \
org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob \
--recommenderClassName com.mycompany.mia.cf.MovieLensRecommender \
--numRecommendations 100 \
--input input/ml-ratings.dat --output output \
--usersFile input/users.txt
|
Mahout also provides a Item RecommenderJob that is not based on the framework described above. Rather, this is a sequence of Hadoop M/R jobs that use a item-item similarity matrix to calculate the recommendations. For customization, it only allows you to pass in a Similarity measure (which implements VectorSimilarityMeasure). Mahout already provides quite a few to choose from (City Block, Co-occurrence, Cosine, Euclidean, Log Likelihood, Pearson, and Tanimoto). I did not investigate writing my own custom implementation. Here is the command to run this job using bin/mahout (since no customization was done).
1 2 3 4 5 | hduser@cyclone:mahout-distribution-0.7$ bin/mahout \
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob \
--input input/ml-ratings.dat --output output \
--numRecommendations 10 --usersFile input/users.txt \
--similarityClassname SIMILARITY_PEARSON_CORRELATION
|
Alternatively, in case you did need to do customizations, you could also just invoke the fat JAR using the bin/hadoop script instead.
1 2 3 4 5 6 | hduser@cyclone:mahout-distribution-0.7$ hadoop jar
my-mahout-fatjar.jar \
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob \
--input input/ml-ratings.dat --output output \
--numRecommendations 10 --usersFile input/users.txt \
--similarityClassname SIMILARITY_PEARSON_CORRELATION
|
Finally, Mahout provides the ALS (Alternating Least Squares) implementation of RecommenderJob. Like the previous RecommenderJob, this also does not use the Recommender framework. It uses an algorithm that was used in one of the entries for the Netflix Prize. I did not try it because it needs feature vectors built first, which I don't yet know how to do. You can find more information in this Mahout JIRA.
So anyway, thats all I have for today. Next up on my list is Clustering with Mahout.
[1] Update 2012-08-31 - Copying scala-libs.jar into $HADOOP_HOME/lib eliminates the need to repackage scala-libs.jar in the fat jar, but you will have to remember to do so (once) whenever your Hadoop version changes. I have decided to follow this strategy, and I have made changes to the fatjar.sh script in GitHub.