Wednesday, August 22, 2012

Learning Mahout : Collaborative Filtering

My Mahout in Action (MIA) book has been collecting dust for a while now, waiting for me to get around to learning about Mahout. Mahout is evolving quite rapidly, so the book is a bit dated now, but I decided to use it as a guide anyway as I work through the various modules in the currently GA) 0.7 distribution.

My objective is to learn about Mahout initially from a client perspective, ie, find out what ML modules (eg, clustering, logistic regression, etc) are available, and which algorithms are supported within each module, and how to use them from my own code. Although Mahout provides non-Hadoop implementations for almost all its features, I am primarily interested in the Hadoop implementations. Initially I just want to figure out how to use it (with custom code to tweak behavior). Later, I would like to understand how the algorithm is represented as a (possibly multi-stage) M/R job so I can build similar implementations.

I am going to write about my progress, mainly in order to populate my cheat sheet in the sky (ie, for future reference). Any code I write will be available in this GitHub (Scala) project.

The first module covered in the book is Collaborative Filtering. Essentially, it is a technique of predicting preferences given the preferences of others in the group. There are two main approaches - user based and item based. In case of user-based filtering, the objective is to look for users similar to the given user, then use the ratings from these similar users to predict a preference for the given user. In case of item-based recommendation, similarities between pairs of items are computed, then preferences predicted for the given user using a combination of the user's current item preferences and the similarity matrix.

Anatomy of a Mahout Recommender

The input to such a system is either a 3-tuple of (UserID, ItemID, Rating) or a 2-tuple of (UserID, ItemID). In the latter case, the preferences are assumed to be boolean (ie, 1 for where the (UserID, ItemID) pair exists, 0 otherwise). In Mahout, this input is represented by a DataModel class, which can be created from a file of these tuples (either CSV or TSV), one per line. Other ways to populate the DataModel also exist, for example, programatically or from database.

A user-based Recommender is built out of a DataModel, a UserNeighborhood and a UserSimilarity. A UserNeighborhood defines the concept of a group of users similar to the current user - the two available implementations are Nearest and Threshold. The nearest neighborhood consists of the nearest N users for the given user, where nearness is defined by the similarity implementation. The threshold neighborhood consists of users who are at least as similar to the given user as defined by the similarity implementation. The UserSimilarity defines the similarity between two users - implementations include EuclideanDistance, Pearson Correlation, Uncentered Cosine, Caching, City Block, Dummy, Generic User, Log Likelihood, Spearman Correlation and Tanimoto Coefficient similarity.

An item-based Recommender is built out of a DataModel and a ItemSimilarity. Implementations of ItemSimilarity include Euclidean Distance, Pearson Correlation, Uncentered Cosine, City Block, Dummy, Log Likelihood, Tanimoto Coefficient, Caching Item, File Item, and Generic Item similarity. The MIA book describes each algorithm in greater detail.

In both cases, an IDRescorer object can be used to modify the recommendations with some domain logic, either by filtering out some of the recommendations (using isFiltered(itemID : Long) : Boolean) or by boosting/deboosting the recommendation score (using rescore(itemID : Long, originalScore : Double) : Double).

Evaluating Recommenders

A Mahout user would build a Recommender using the "right" mix of the components described above. What is a right mix is not readily apparent, so we can just ask the computer. Mahout provides three evaluation metrics, the Average Absolute Difference, Root Mean Square Difference and the IR Stats (which provides precision and recall at N). Here is some some code that uses the IRStats evaluator to run a sample of the input against various combinations and reports the precision and recall at a certain point.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package com.mycompany.mia.cf

import java.io.File
import scala.collection.mutable.HashMap
import org.apache.mahout.cf.taste.common.Weighting
import org.apache.mahout.cf.taste.eval.{RecommenderBuilder, IRStatistics}
import org.apache.mahout.cf.taste.impl.eval.GenericRecommenderIRStatsEvaluator
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.impl.model.{GenericDataModel, GenericBooleanPrefDataModel}
import org.apache.mahout.cf.taste.impl.neighborhood.{ThresholdUserNeighborhood, NearestNUserNeighborhood}
import org.apache.mahout.cf.taste.impl.recommender.{GenericUserBasedRecommender, GenericItemBasedRecommender}
import org.apache.mahout.cf.taste.impl.similarity.{UncenteredCosineSimilarity, TanimotoCoefficientSimilarity, PearsonCorrelationSimilarity, LogLikelihoodSimilarity, EuclideanDistanceSimilarity, CityBlockSimilarity}
import org.apache.mahout.cf.taste.model.DataModel
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood
import org.apache.mahout.cf.taste.recommender.Recommender
import org.apache.mahout.cf.taste.similarity.{UserSimilarity, ItemSimilarity}
import scala.io.Source
import scala.util.Random
import java.io.PrintWriter

object RecommenderEvaluator extends App {

  val neighborhoods = Array("nearest", "threshold")
  val similarities = Array("euclidean", "pearson", "pearson_w", 
                           "cosine", "cosine_w", "manhattan", 
                           "llr", "tanimoto")
  val simThreshold = 0.5
  val sampleFileName = "/tmp/recommender-evaluator-sample.tmp.csv"
    
  val argmap = parseArgs(args)
  
  val filename = if (argmap("sample_pct") != null) {
    val nlines = Source.fromFile(argmap("filename")).size
    val sampleSize = nlines * (argmap("sample_pct").toFloat / 100.0)
    val rand = new Random(System.currentTimeMillis())
    var sampleLineNos = Set[Int]()
    do {
      sampleLineNos += rand.nextInt(nlines)
    } while (sampleLineNos.size < sampleSize)
    val out = new PrintWriter(sampleFileName)
    var currline = 0
    for (line <- Source.fromFile(argmap("filename")).getLines) {
      if (sampleLineNos.contains(currline)) {
        out.println(line)
      }
      currline += 1
    }
    out.close()
    sampleFileName
  } else {
    argmap("filename")
  }
  
  val model = argmap("bool") match {
    case "false" => new GenericDataModel(
      GenericDataModel.toDataMap(new FileDataModel(
      new File(filename)))) 
    case "true" => new GenericBooleanPrefDataModel(
      GenericBooleanPrefDataModel.toDataMap(new FileDataModel(
      new File(filename))))
    case _ => throw new IllegalArgumentException(
      invalidValue("bool", argmap("bool")))
  }
  val evaluator = new GenericRecommenderIRStatsEvaluator()

  argmap("type") match {
    case "user" => {
      for (neighborhood <- neighborhoods;
           similarity <- similarities) {
        println("Processing " + neighborhood + " / " + similarity)
        try {
          val recommenderBuilder = userRecoBuilder(
            neighborhood, similarity, 
            model.asInstanceOf[GenericDataModel])
          val stats = evaluator.evaluate(recommenderBuilder, 
            null, model, 
            null, argmap("precision_point").toInt, 
            GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD, 
            argmap("eval_fract").toDouble)
            printResult(neighborhood, similarity, stats)
        } catch {
          case e => {
            println("Exception caught: " + e.getMessage)
          }
        }
      }
    }
    case "item" => {
      for (similarity <- similarities) {
        println("Processing " + similarity)
        try {
          val recommenderBuilder = itemRecoBuilder(similarity, 
            model.asInstanceOf[GenericDataModel])
          val stats = evaluator.evaluate(recommenderBuilder, null, 
            model, null, argmap("precision_point").toInt,
            GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
            argmap("eval_fract").toDouble)
          printResult(null, similarity, stats)
        } catch {
          case e => {
            println("Exception caught: " + e.getMessage)
          }
        }
      }
    }
    case _ => throw new IllegalArgumentException(
      invalidValue("type", argmap("type")))
  }

  def usage() : Unit = {
    println("Usage:")
    println("com.mycompany.mia.cf.RecommenderEvaluator [-key=value...]")
    println("where:")
    println("sample_pct=0-100 (use sample_pct of original input)")
    println("type=user|item (the type of recommender to build)")
    println("bool=true|false (whether to use boolean or actual preferences)")
    println("precision_point=n (the precision at n desired)")
    println("eval_fract=n (fraction of data to use for evaluation)")
    System.exit(1)
  }
  
  def parseArgs(args : Array[String]) : HashMap[String,String] = {
 val argmap = new HashMap[String,String]()
    for (arg <- args) {
      val nvp = arg.split("=")
      argmap(nvp(0)) = nvp(1)
    }
    argmap
  }

  def invalidValue(key : String, value : String) : String = {
    "Invalid value for '" + key + "': " + value
  }
  
  def itemRecoBuilder(similarity : String, 
      model : GenericDataModel) : RecommenderBuilder = {
    val s : ItemSimilarity = similarity match {
      case "euclidean" => new EuclideanDistanceSimilarity(model)
      case "pearson" => new PearsonCorrelationSimilarity(model)
      case "pearson_w" => new PearsonCorrelationSimilarity(
        model, Weighting.WEIGHTED)
      case "cosine" => new UncenteredCosineSimilarity(model)
      case "cosine_w" => new UncenteredCosineSimilarity(
        model, Weighting.WEIGHTED)
      case "manhattan" => new CityBlockSimilarity(model)
      case "llr" => new LogLikelihoodSimilarity(model)
      case "tanimoto" => new TanimotoCoefficientSimilarity(model)
      case _ => throw new IllegalArgumentException(
        invalidValue("similarity", similarity))
    }
    new RecommenderBuilder() {
      override def buildRecommender(model : DataModel) : Recommender = {
        new GenericItemBasedRecommender(model, s)
      }
    }
  }
  
  def userRecoBuilder(neighborhood : String, 
      similarity : String,
      model : GenericDataModel) : RecommenderBuilder = {
    val s : UserSimilarity = similarity match {
      case "euclidean" => new EuclideanDistanceSimilarity(model)
      case "pearson" => new PearsonCorrelationSimilarity(model)
      case "pearson_w" => new PearsonCorrelationSimilarity(
        model, Weighting.WEIGHTED)
      case "cosine" => new UncenteredCosineSimilarity(model)
      case "cosine_w" => new UncenteredCosineSimilarity(
        model, Weighting.WEIGHTED)
      case "manhattan" => new CityBlockSimilarity(model)
      case "llr" => new LogLikelihoodSimilarity(model)
      case "tanimoto" => new TanimotoCoefficientSimilarity(model)
      case _ => throw new IllegalArgumentException(
        invalidValue("similarity", similarity))
    }
    val neighborhoodSize = if (model.getNumUsers > 10) 
      (model.getNumUsers / 10) else (model.getNumUsers)
    val n : UserNeighborhood = neighborhood match {
      case "nearest" => new NearestNUserNeighborhood(
        neighborhoodSize, s, model) 
      case "threshold" => new ThresholdUserNeighborhood(
        simThreshold, s, model)
      case _ => throw new IllegalArgumentException(
        invalidValue("neighborhood", neighborhood))
    }
    new RecommenderBuilder() {
      override def buildRecommender(model : DataModel) : Recommender = {
        new GenericUserBasedRecommender(model, n, s)
      }
    }
  }
  
  def printResult(neighborhood : String, 
      similarity : String, 
      stats : IRStatistics) : Unit = {
    println(">>> " + 
      (if (neighborhood != null) neighborhood else "") + 
      "\t" + similarity +
      "\t" + stats.getPrecision.toString + 
      "\t" + stats.getRecall.toString)
  }
}

The command below will run the evaluator for all the item based recommenders using 10% of the MovieLens 1M rating file and report the precision and recall at 2 for each recommender. Note that some recommenders may not give you results because there is not sufficient data. This evaluator is a work in progress so I may change it to use one of the other evaluation metrics.

1
2
3
4
5
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.cf.RecommenderEvaluator \
  filename=data/ml-ratings.dat \
  type=item bool=false \
  precision_point=2 sample_pct=10 eval_fract=1'

Running the Recommender

As you can see, Mahout provides nice building blocks to build Recommenders with. Recommenders built using this framework can be run on a single machine (local mode) or on Hadoop via the so-called pseudo-distributed RecommenderJob that splits the input file across multiple Recommender reducers. To run in the latter mode, the recommender must have a constructor that takes a DataModel. Presumably if I am building a recommender with this framework, I would want it to scale up in this manner, so it makes sense to build it as the framework requires. Here is a custom Item-based recommender that uses the PearsonCorrelation similarity. The file also contains a runner that can be used for calling/testing in local mode.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.mycompany.mia.cf

import java.io.File
import java.util.List

import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.mutable.HashSet
import scala.io.Source

import org.apache.mahout.cf.taste.common.Refreshable
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel
import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender
import org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity
import org.apache.mahout.cf.taste.model.DataModel
import org.apache.mahout.cf.taste.recommender.{Recommender, RecommendedItem, IDRescorer}

object MovieLensRecommenderRunner extends App {
  // grab the input file name
  val filename = if (args.length == 1) args(0) else "unknown"
  if ("unknown".equals(filename)) {
    println("Please specify input file")
    System.exit(-1)
  }
  // train recommender
  val recommender = new MovieLensRecommender(
    new FileDataModel(new File(filename)))
  // test recommender
  val alreadySeen = new HashSet[Long]()
  val lines = Source.fromFile(filename).getLines
  for (line <- lines) {
    val user = line.split(",")(0).toLong
    if (! alreadySeen.contains(user)) {
      val items = recommender.recommend(user, 100)
      println(user + " =>" + items.map(x => x.getItemID).
        foldLeft("")(_ + " " + _))
    }
    alreadySeen += user
  }
}

class MovieLensRecommender(model : DataModel) extends Recommender {

  val similarity = new PearsonCorrelationSimilarity(model)
  val delegate = new GenericItemBasedRecommender(model, similarity)

  // everything below this is boilerplate. We could use the 
  // RecommenderWrapper if it was part of Mahout-Core but its part 
  // of the Mahout-Integration for the webapp
  
  def recommend(userID: Long, howMany: Int): List[RecommendedItem] = {
    delegate.recommend(userID, howMany)
  }

  def recommend(userID: Long, howMany: Int, rescorer: IDRescorer): List[RecommendedItem] = {
    delegate.recommend(userID, howMany, rescorer)
  }

  def estimatePreference(userID: Long, itemID: Long): Float = {
    delegate.estimatePreference(userID, itemID)
  }

  def setPreference(userID: Long, itemID: Long, value: Float): Unit = {
    delegate.setPreference(userID, itemID, value)
  }

  def removePreference(userID: Long, itemID: Long): Unit = {
    delegate.removePreference(userID, itemID)
  }

  def getDataModel(): DataModel = {
    delegate.getDataModel()
  }

  def refresh(alreadyRefreshed: java.util.Collection[Refreshable]): Unit = {
    delegate.refresh(alreadyRefreshed)
  }
}

We can run this in local mode on the command line using sbt as shown below:

1
2
3
4
5
6
7
sujit@cyclone:mia-scala-examples$ sbt 'run-main \
  com.mycompany.mia.cf.MovieLensRecommenderRunner data/intro.csv'
1 => 104
2 =>
3 => 103 102
4 => 102
5 =>

For running within the Hadoop pseudo-distributed RecommenderJob, it may be nice to have a little more data. We can use the MovieLens 1M rating file, with a few Unix transformations to make it look like a larger version of the intro.csv file. (You can also use a custom DataModel to read the format, apparently one is available in Mahout according to the MIA book).

1
2
sujit@cyclone:data$ cat ~/Downloads/ml-ratings.dat | \
  awk 'BEGIN{FS="::"; OFS=",";} {print $1, $2, $3}' > ml-ratings.dat 

Next, you should have Hadoop set up (I have it set up to work in pseudo-distributed mode, ie a cluster of 1 (not to be confused with the Mahout pseudo-distributed RecommenderJob)).

Finally make sure that the $MAHOUT_HOME/bin/mahout script works. It runs in the context of the Mahout distribution, so it can barf if it does not find the JARs in the places it expects them to be in. To remedy this, run mvn -DskipTest install to create all the required JAR files. Then make sure that HADOOP_HOME and HADOOP_CONF_DIR enviroment variables are set to point to your Hadoop installation. Granted, the script is not very useful (see below for why not) right now, but it will come in handy when trying to use standard Mahout functionality without having to worry about pesky classpath issues. So making it work is time well-spent.

Once I got the bin/mahout script working, I discovered that it wouldn't find my custom Recommender class, even though the JAR appeared in the script's classpath. Ultimately, based on advice on this Stack Overflow page and error messages from Hadoop, I ended up un-jarring up all of mahout-core-0.7-job.jar, scala-library.jar and the classes under my project, and jarring them back together into one big fat JAR. Apparently sbt has a assembly plugin you can customize to do this, but my sbt-fu is weak to non-existent, so I used a shell script instead[1]. Once the fat jar is built, you can invoke the pseudo-distributed Recommender job like so:

1
2
3
4
5
6
7
hduser@cyclone:mahout-distribution-0.7$ hadoop jar \
  my-mahout-fatjar.jar \
  org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob \
  --recommenderClassName com.mycompany.mia.cf.MovieLensRecommender \
  --numRecommendations 100 \
  --input input/ml-ratings.dat --output output \
  --usersFile input/users.txt

Mahout also provides a Item RecommenderJob that is not based on the framework described above. Rather, this is a sequence of Hadoop M/R jobs that use a item-item similarity matrix to calculate the recommendations. For customization, it only allows you to pass in a Similarity measure (which implements VectorSimilarityMeasure). Mahout already provides quite a few to choose from (City Block, Co-occurrence, Cosine, Euclidean, Log Likelihood, Pearson, and Tanimoto). I did not investigate writing my own custom implementation. Here is the command to run this job using bin/mahout (since no customization was done).

1
2
3
4
5
hduser@cyclone:mahout-distribution-0.7$ bin/mahout \
  org.apache.mahout.cf.taste.hadoop.item.RecommenderJob \
  --input input/ml-ratings.dat --output output \
  --numRecommendations 10 --usersFile input/users.txt \
  --similarityClassname SIMILARITY_PEARSON_CORRELATION

Alternatively, in case you did need to do customizations, you could also just invoke the fat JAR using the bin/hadoop script instead.

1
2
3
4
5
6
hduser@cyclone:mahout-distribution-0.7$ hadoop jar 
  my-mahout-fatjar.jar \
  org.apache.mahout.cf.taste.hadoop.item.RecommenderJob \
  --input input/ml-ratings.dat --output output \
  --numRecommendations 10 --usersFile input/users.txt \
  --similarityClassname SIMILARITY_PEARSON_CORRELATION

Finally, Mahout provides the ALS (Alternating Least Squares) implementation of RecommenderJob. Like the previous RecommenderJob, this also does not use the Recommender framework. It uses an algorithm that was used in one of the entries for the Netflix Prize. I did not try it because it needs feature vectors built first, which I don't yet know how to do. You can find more information in this Mahout JIRA.

So anyway, thats all I have for today. Next up on my list is Clustering with Mahout.

[1] Update 2012-08-31 - Copying scala-libs.jar into $HADOOP_HOME/lib eliminates the need to repackage scala-libs.jar in the fat jar, but you will have to remember to do so (once) whenever your Hadoop version changes. I have decided to follow this strategy, and I have made changes to the fatjar.sh script in GitHub.

4 comments (moderated to prevent spam):

Anonymous said...

hai ...

ur post's all are so good... all this information are very useful for me.. but i have one doubt bcoz i m new to project environment.. how i import the apache packages.. i tried so many ways but i couldnt find.. can u help me by give the way... thank u in advance

Sujit Pal said...

Hi, if you mean importing the apache JARs for calls on the command line, then there are several options available.

* If using bin/mahout the script already does this for you.
* If using the bin/hadoop script to call provided Mahout drivers or functionality, then you will need the mahout-core-*-job.jar in the classpath (export CLASSPATH=$CLASSPATH:/path/to/mahout-core-job.jar).
* If using bin/hadoop to invoke custom code, then you need to package your classes along with the classes from mahout-core-*-job.jar (see bin/fatjar.sh in my project).

In the past, I have also had success with a slightly different fat jar packaging I have described here. Haven't tried this with scala, let me know if it works for you.

Unknown said...

Hi

Can you please tell me how to decide which similarity measure should be used before applying any measures to generate recommendations based on data we have ?

Thanks
Gaurhari

Sujit Pal said...

Hard to say, I think the answer depends on the data. One way could be to split your data into two parts, then build models using different similarity measures on one part and see how accurate it is at predicting against the other part - evaluation criteria could be accuracy, precision/recall/F1, mean square error, etc, depending on your use case. Then choose the model that gives the best results.