Saturday, October 13, 2018

Trip Report: RecSys 2018


I attended the ACM Conference on Recommender Systems (RECSYS 2018) at Vancouver last week. People who know me personally would probably be a bit surprised, since my claim to being interested in recommenders is based almost solely on having read Satnam Alag's Collective Intelligence in Action, and attending the Coursera course on Recommender Systems conducted by Professors Joseph Konstan and Michael Ekstrand a few years ago. However, of late, I have been working on recommender systems with our Health Education group, and I figured that attending this conference, while akin to drinking from a firehose, will quickly give me an indication of the latest techniques in the field, as well as introduce me to ideas that I could adapt and reuse in my own domain. As you will see from my trip report, I was not disappointed.

One notable thing about RecSys is that attendees seemed friendlier in general than other (NLP oriented) ACM conferences I have been to earlier. Or maybe its just me finally overcoming my imposter syndrome. In any case, I found my fellow attendees at RecSys much more willing to compare notes and share their expertise. I was also fortunate to meet up with several of my colleagues from Elsevier, some for the very first time, as well as recommender systems experts and past RecSys attendees who I had met earlier at EMNLP 2017 in Copenhagen, as well as many others. Somehow, our friend graph must have achieved some sort of critical mass, since it kept expanding over the duration of the conference. I also got a chance to say hello to Professors Ekstrand and Konstan and thank them for their Coursera course, which has introduced so many people, including me, to recommender systems. And downtown Vancouver, where the conference was held, is something of a foodie paradise. So along with being a very educational experience, RecSys 2018 was also a lot of fun.


Tutorials


The conference started with 1 day of tutorials, followed by 3 days of conference presentations, followed by 2 days of workshops. I was curious about how Deep Learning (DL) was being used in Recommenders, and apparently many other attendees felt the same way, because the 3 tutorials I signed up for, all DL related, were all sold out. Shades of NIPS perhaps, where Sowmith Chintala joked that AI researchers will have to discover time travel in order to get a ticket for 2019 before they are sold out. In any case, I found all my tutorials to be uniformly very interesting (discounting the almost universal urge by presenters to explain skip-gram or LSTM internals one more time).

The first tutorial I attended talked about Distributed Representations in Recommender Systems and covered architectures like prod2vec (aka item2vec) based on item-item cooccurrences from transaction sequences of co-purchased items, meta prod2vec which added product features as well, and content2vec which combines multiple embedding types, including image embeddings.

My second tutorial was titled Modularizing Deep Neural Network Inspired Recommendation Engines and was a walkthrough of OpenRec by its creator. OpenRec is a modular library used to construct DL based Recommender algorithms. The API is quite elegant and reminded me a bit of Keras, except at a higher abstraction level.

My third tutorial was about Sequence aware Recommendation, which tries to use a sequence of user-item interactions to make richer user models than standard latent factor (matrix factorization based) models using single user-item interaction pairs.

My overall impression at the end of the tutorials was that the field of Recommender Systems (RS) has borrowed and adapted a lot of ideas from Natural Language Processing (NLP), at least around distributional representations (word2vec, transfer learning for images using pretrained CNNs, etc) and DL architectures such as Autoencoders (AE) and Recurrent Neural Networks (RNN). Together, they served as a good background for many of the conference presentations and workshops.


Main Conference


The conference this year was single-track, which meant that I no longer had to start each day with a highlighter and the program to figure out which of two or three interesting presentations I would have to drop. On the flip side, it did mean that some of the presentations may not be interesting to everyone. For the latter case, there were always plenty of posters to look at and discuss with authors. There were also quite a few booths set up by sponsor companies. The full conference program can be found here.

Conference Day 1

The first day opened up with a keynote from Elizabeth Churchill of Google, who spoke about recommendation design principles as a set of five E's -- Explainable, Equitable, Ethical, Expedience and Exigence. This was followed by paper sessions on Explanations, Algorithms and Products.

The Explanation set of papers explored novel solutions for providing explanations from Recommender Systems. Interesting ideas included using a Generative Adversarial Network (GAN) to generate personalized reviews for users to explain why they should purchase the item; an attempt to quantify how much explanation is "enough"; the effect of providing explanations for reciprocal recommendations (in the context of matchmaking); and interpreting user inaction as a possible signal to the RS.

The Algorithms session focused on algorithms papers from industry. Interesting papers were on Variational Learning to Rank (VLTR) which attempts to balance explore/exploit decisions by shuffling product listings according to the model's relevance uncertainty for each product; the use of the (currently) state-of-the-art AWD-LSTM model along with session features to recommend real estate listings by Realtor.com; using CNN and RNN networks and leveraging contextual bandits to balance exploration/exploitation decisions at Hulu.com to keep users watching videos via autoplay; overlaying a ML model over a standard MF one to make related pins context dependent at PInterest; and combating clickbait by combining content and usage signals for articles so they act as regularizers for each other at FlipBoard.

This was followed by the Products set of papers, some of which were quite mathy and theoretical. Interesting ideas included an investigation of recommendations that are constrained to benefit not only the primary user, but other stakeholders as well; modeling sequential user behavior using translation based Factorization Machines (FM); an investigation into how much data can the user retract and still allow a recommender to make good predictions, something of renewed interest in the current age of GDPR; a DL model to predict complementary items; an extension of Matrix Factorization (MF) to work with sequences of behavior signals to make item recommendations; A Reinforcement Learning (RL) architecture to predict page-wise product recommendations; and a method to de-bias logging data so it can be reused in other models.

Conference Day 2

The second day opened with a keynote by Lisa Getoor from UC Santa Cruz, who contended that we often flatten inherent structure in our data because we use matrix algebra. She then goes on to propose a new language PSL that allow you to make use of logical structure and handles uncertainty. The keynote was followed by paper sessions on Learning and Optimization, System Considerations and Travel and Entertainment.

The Learning and Optimization session consisted of some pretty novel DL architectures. The first one proposes a Neural Gaussian Mixture Model (NGMM) combining Gaussian Mixture Models with a pair of neural networks to produce rating predictions from reviews. The second one uses Deep Neural Memory to record a sequence of user interactions, which is then used by contextual bandits for explore/exploit decisions. The next one is a non-DL model that calculates the optimal the mix between both parties in a reciprocal RS to predict the best reward. The next one was a best paper candidate and tries to combine preferences from MF and graph based models by filtering the prediction based on graph distance (aka higher order proximity). The next one is also a best paper candidate and uses a variational AE (VAE) that combines user and item features to learn a latent space, that can then be used to generate personalized item recommendations. The final paper in this set is about calibrated recommendations, that attempts to balance the user's interests across his recommendations.

The next group of papers have to do with novel situations that dictate the design of the RS. For example, a discussion of problems that can impact the performance of an RS in production; respecting privacy boundaries when recommending on Slack; deciding which video image to show on Netflix; incorporating intent for voice recommendations on Comcast X1; and techniques to standardize listings to facilitate communication between buyer and seller on eBay.

The next set of talks were about recommendations in the Travel and Entertainment industry. There were talks about bundling telecom services (for example, the channel lineup) personalized to the user; using a questionnaire to quickly gauge the user's preference to solve the cold start problem; a graph based public transport route planner that allows user to specify various parameters, including comfort (the city in the test case is Kolkata, and I guess I can relate, having used the public transport there); a very interesting study comparing user interactions with voice and visual recommendations; mapping out-of-stock items to similar in-stock items and combining their interaction history to create more accurate item recommendations; and a system to recommend the hero to choose for Multiplayer Online Battle Arena games.

In the evening there was a RecSys sponsored banquet by the Vancouver marina.

Conference Day 3

I missed the keynote and first session of the third day, because a few of us decided to take advantage of face time to meet over breakfast and do some brainstorming, and we lost track of time. I regret missing the keynote by Christopher Berry of the Canadian Broadcasting Corporation, I heard later that he covered some things about social responsibility I care deeply about. In any case, we also missed the paper session on RecSys that Care, so I will only cover the next two sessions on Metrics and Evaluation, and Beyond Users and Items.

The Metrics and Evaluation session, as you can imagine, is mostly about measuring things. The talks covered predicting best answer in a community Q+A site based on not only content features, but also user features; an investigation into how various IR metrics vary with N for top-N recommendations; a framework for benchmarking stream based news recommenders; a comparative study of related video recommendations, between newest, most similar or most relevant; removing bias from offline recommender evaluation for missing-not-at-random implicit feedback; and an attempt to understand human perceptions of image similarity in the context of related item recommendations.

The Beyond Users and Items session had talks that dealt with extensions to the basic Collaborative Filtering (CF) model. Talks were about the use of RNNs to map Knowledge Graph (KG) paths to graph embeddings (RKGE); differences in how various types of preferences (CF, content, social, trust-based) should be applied for different types of products; SpectralCF, applying a spectral convolution on a user-item bipartite graph, that can alleviate the cold-start problem by benefiting from the rich connectivity information in the spectral domain; the benefits of using so-called side information (categorical item features) in recommender systems; the benefits of pairwise preference elicitations (asking questions to cold start users to figure out their preferences) over pointwise; and using topic modeling and streaming MF on text features in an online recommender environment.

Workshops


The next two days were workshops. I attended the Deep Learning (DLRS), Knowledge Transfer and Learning (KTL) and the Knowledge Aware and Conversational Recommender Systems (KARS) workshops. I also attended the HealthRecSys workshop for a while at the recommendation of someone I met at the banquet, but it wasn't what I was looking for.

This is going to be the last DLRS workshop, since the workshop was set up to encourage the use of DL techniques in RS, and it has reached a point where no more encouragement is needed. Information about papers at the workshop can be found at the DLRS website. There were two talks that used VAEs to do recommendations, a very thorough description of a DL based recommender for news called CHAMELEON used by Brazilian newspaper Globo.com, and an interesting idea of creating explainable recommender systems by replacing the middle layer of an autoencoder with properties from a KG.

The KTL workshop was about using transfer learning techniques. Talks in the workshop were about investigating if better ImageNet models are also better for transfer learning for image recommendations; using a hybrid VAE for CF instead of more traditional techniques like MF; a new embedding technique BB2vec that learns product embeddings for complementary item recommendations from baskets and browsing sessions; using information about venues from other cities to recommend the next venue to visit for a given city; and detecting change points in user preferences using HMM for sequential recommendation tasks.

The KARS workshop discussed ways to incorporate knowledge (from KGs) into RS. Taks in the workshop were about deriving item features from domain knowledge; merging user and content features to create personalized recommendations for scholarly papers; computing recommendations using a KG aware AE; KG aware RS for software development that leverages old APIs in the system to recommend new ones; and narrative driven book reccomendations, where one's book reviews are used as the basis for recommending books.

Conclusion


While I have listed all the talks I attended above, I wanted to reflect a little bit on what I personally took away from the conference. In the past, especially with NLP conferences, I would find most of the topics super-interesting and applicable in some way or other to my work. In this case, perhaps because the RS I am working on don't really use cutting edge methods and probably never will, I found the actual techniques to be of limited utility, except perhaps as ideas to chase in specific situations. However, I did pick up on a few things and plan to try and apply in my own domain (not necessarily just the RS I am working on). Here is a tentative list.

  • many custom ways of building embeddings to reflect various domain specific scenarios
  • imaginative use of DL structures such as VAE, GAN, Neural Memory, etc.
  • incorporation of graph embeddings from Knowledge Graphs using sequence modeling techniques such as HMM and RNN
  • mixing of traditional and DL techniques such as Neural GMM, SpectralCF, etc.

In addition, I finally understood what a Contextual Bandit is, thanks to the efforts of my co-attendees during a coffee break. There are also a few RS specific tools and ideas that I want to delve into in more detail, such as OpenRec, reco-gym and Stream based recommenders. I also plan on looking at recommender specific DL architectures such as prod2vec, Gru4Rec, and Wide and Deep, to name a few, as well as look at the AWD-LSTM LM and the practical tips from Fast.ai as mentioned by Evan Oldridge, the presenter from realtor.com. I also want to take a look at techniques for Change Point detection as described by Prof Bamshad Mobashar (for a different application), and KG weighting streategies for scholarly paper recommendation (slides).



Monday, October 01, 2018

Trip Report (sort of): RELX Search Summit 2018


Last week, I was at our London office attending the RELX Search Summit. The RELX Group is the parent company that includes my employer (Elsevier) as well as LexisNexis, LexisNexis Risk Solutions and Reed Exhibitions, among others. The event was organized by our Search Guild, an unofficial special interest group of search professionals from all these companies. As you can imagine, search is something of a big deal at both LexisNexis and Elsevier, given several large, well known search platforms such as Lexis Advance, ScienceDirect, Scopus and Clinical Key.

There were quite a few interesting presentations at the Search Summit, some of which I thought was quite groundbreaking from an applied research point of view. I was going to write up a trip report for that when I realized that at least some of the talks probably represented competitive information that would not be appropriate for a public forum. Besides, it is very likely that it would of limited interest anyway. So I decided to write this post only about the two presentations that I did at the Summit. Neither of them are groundbreaking, but I think it might be interesting for most people in general. Hopefully you think so too.

The first presentation was a 3 hour tutorial session around Content Engineering. The theme I wanted to explore was how to identify keywords in text using various unsupervised and supervised techniques, and how this can improve search. As you know, my last job revolved around search driven by a medical ontology, where the ontology was painstakingly hand-curated by a team of doctors, nurses and pharmacists over a span of several years.

Having an ontology makes quite a few things much easier. (It also makes several things much harder, but we won't dwell on that here). However, the use case I was trying to replicate was where you have the content to search, but no ontology to help you with it. Could we, using a variety of rule-based, statistical and machine learning techniques, identify phrases that represent key ideas in the text, similar to concepts in our ontology? And how does this help with search?

The dataset I used was the NIPS (Neural Information Processing Systems) conference papers from 1987 to 2017. The hope was that I would learn something about the cool techniques and algorithms that show up at NIPS, just by having to look at the text to debug problems, although it didn't quite work out the way I had hoped. I demonstrate a variety of techniques such as LLR (statistical), RAKE (rule based) and MAUI (machine learning based), as well as using Stanford and SpaCy NER models, and duplicate keyword detection and removal using SimHash and Dedupe. In addition, I also demonstrate how to do dimensionality reduction using various techniques (PCA, Topic Modeling, NMF, word vectors) and how they can be used to enhance the search experience. In addition, another point I was trying to make was that there is plently of third party open source tools available to do this job without significant investment in coding.

All the techniques listed above are demonstrated using Jupyter notebooks. In addition, I built a little Flask based web application that showed these techniques in action against a Solr 7.3 index containing the NIPS papers. The web application demonstrates techniques both on the query parsing side where we rewrite queries in various ways to utilize the information available, as well as on the content side, where the additional information is used to suggest documents like the one being viewed, or make personalized reading recommendations based on the collection of documents read already.

The presentation slides, the notebooks and the web application can all be found in my sujitpal/content-engineering-tutorial project on Github. Several new ideas were suggested by participants during the tutorial, since many of them had been looking at similar ideas already, so it morphed into a nice interactive workshop style discussion. I hope to add them in as I find time.

My second presentation was around Learning to Rank (LTR) basics. I had recently become interested in LTR following my visit to the Haystack Search Relevancy conference earlier this year, coupled with a chance discovery that a content-based recommender system I was working to help improve had around 40,000 labeled query document pairs, which could be used to improve the quality of recommendations.

The dataset I chose for this presentation was The Movie DataBase (TMDB), a collection of 45,000 movies, 20 genres and 31,000 unique keywords. The idea was to see if I could teach a RankLib LambdaMART model the ordering given by the rating field on a 10 point continuous scale. In a sense, the approach is similar to this LTR article using scikit-learn by Alfredo Motta. Most LTR datasets just give you the features dataset in LETOR format to train your ML models, so you can't actually do the full end-to-end pipeline.

In any case, the presentation starts off with a little bit of historical context, talks about different kinds of LTR models (pointwise, pairwise and listwise), some common algorithms that people tend to use, some advice to keep in mind when considering building an LTR model, ideas for features, the LETOR data format, etc. Most of the meat of the presentation is the creation of an LTR model using the Solr 7.4 and Elasticsearch 6.3.1 plugins, as well for a hypothetical indexer with no LTR support (I used Solr, but did the feature generation outside the indexer). I was hoping to cover at least one of the case studies but ran into technical difficulties (my fault, I should have listened to the organizers when they said to put everything in the slides).

Essentially, the methodology is similar for either of the 3 case studies, the main differences are in syntax (for Solr vs Elasticsearch). First we need a set of queries with a ranked list of documents for each. I used the ratings to create categorical query-document labels on a 5 point scale as explained earlier.

Once the data is loaded, the first step is to define the features to Solr and Elasticsearch - features are specified as function queries. We then generate the feature values from Solr or Elasticsearch by running the queries against these function queries and writing the features into a file in LETOR format. The reason we use the index is mostly to generate the query-document similarity features. For a system without LTR support, this can be done less efficiently outside the index as well.

The LETOR format was originally used by the LTR model suite RankLib (provides 8 different LTR models), and has since been adopted by most other third party LTR models. I trained a RankLib LambdaMART model for all 3 cases. Model training has to happen using third party algorithms, of which RankLib is one. The output of RankLib is an XML file whose format varies depending on what kind of model it represents. For a linear model, it is just a set of coefficients for each of the features defined. For RankNet, a neural network, it is a weight matrix that transforms the incoming features into a set of rank probabilities. For LambdaMART, which is a forest of decision trees, it is a set of trees, each with splits defined for various levels.

Once the model is trained, it has to be uploaded to Solr or Elasticsearch. Solr needs the model to be specified in JSON format, so you need to write some code to convert the XML to JSON, while Elasticsearch will accept the XML definition of the trained model without any conversion. You can now use the rerank functionality in Solr or Elasticsearch to rerank the top slice of a base query. For indexers that don't have LTR support, you will have to generate the search results, extract and rerank the top slice using your trained model, and add it back into the search results yourself.

Notebooks and scripts describing each of the case studies as well the presentation slides can be found in my sujitpal/ltr-examples repository on Github. My main objective here was to understand how to "do" LTR using Solr and Elasticsearch, so I didn't spend much time trying to improve results. Perhaps if I can find a bigger labeled dataset to play with, I might revisit this project and try to evaluate each platform in more detail, would appreciate suggestions if you know of any such datasets. Note that standard LTR datasets such as MSLR-WEB10K just provide the features in LETOR format, so it is all about the part where you train and evaluate the LTR model, nothing to do with showing the results on the search index. What I am looking for is a dataset with labeled query-document pairs.

That's all I have for today. This week I am at RecSys 2018 at Vancouver, hoping to learn more about recommendation systems from the very best in the field, and meet others working in the recommendation space. Do ping me if you are here as well, would be nice to meet face to face.


Friday, August 31, 2018

Collaborative Filtering Recommender with Spark GraphX


Earlier this week, I attended a webinar titled Building the Next Generation Recommendation Engine with a Graph Database hosted by TigerGraph as part of their Graph Gurus series. I attended because I am doing some work with recommenders nowadays, and in a past life, I used to do a lot with graphs (not recommenders), and I was curious how they were using graphs for recommenders. I was thinking in terms of graph adjacency heuristics as features for content based recommenders, so when they demo-ed a simple collaborative filtering recommender during the webinar, I had a bit of a duh moment.

In any case, it got me thinking about trying to implement this using Spark GraphX. I have been meaning to take a look at GraphX for a while, but the stars never quite lined up enough for me to take the plunge. While the use case is probably more suitable for use on a graph database such as TigerGraph, it seemed like a good way to experiment with the GraphX API. So I did, and I describe the experience in today's post.

The recommendations are based on the intuition that people who liked the items that you liked also liked these other items. I used the MovieLens 100k dataset that is made available thanks to the GroupLens project. So in our case, we will recommend movies to a user based on movies other people liked who liked the same movies as that user. The algorithm goes something like this:

  1. Find all movies M rated by a specific user u.
  2. Find all users U who rated at least one of the movies in M.
  3. Based on ratings, find p users who have the most similar tastes as our original user u.
  4. Find all movies M' these p people rated that u has not rated yet.
  5. Recommend top q movies with highest average rating by the p persons.

My first attempt used the GraphX APi to build a graph, but then I just fell back to using the triplets as an RDD, and using standard RDD operations to implement the steps. Here is the code I came up with originally. Note that I use the Databricks notebook environment, so the code does not have the standard Spark boilerplate wrapper code.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val MOVIELENS_DIR = "/path/to/MovieLens-100k"

// load movielens data
val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })
ratingsRDD.take(10)

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by the given user u
val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
println("# movies rated by user u: %d".format(moviesRatedByUserRDD.count))

// step 2: find all users U who rated set of movies M
val usersWhoRatedMoviesRDD = graph.edges
  .map(e => (e.dstId, (e.srcId, e.attr)))      // (movieId, (userId, rating_l))
  .join(moviesRatedByUserRDD)                  // (movieId, rating_r)
  .map(j => {                                  // (movieId, ((userId, rating_l), rating_r))
    val movieId = j._1
    val userId = j._2._1._1
    val srcRating = j._2._1._2
    val targetRating = j._2._2
    (userId, movieId, srcRating, targetRating) // (userId, movieId, rating_l, rating_r)
  })
// usersWhoRatedMoviesRDD.take(10)

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

val sourceVectorElements = usersWhoRatedMoviesRDD.filter(rec => rec._1 == sourceUserId)
  .map(rec => {
    val movieId = rec._2
    val rating = rec._3
    (movieId, rating)
  })
  .collect
  .toList
val sourceVec = buildVector(sourceVectorElements)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = usersWhoRatedMoviesRDD
  .filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val userId = rec._1
    val movieId = rec._2
    val rating = rec._4
    (userId, List((movieId, rating)))            // (userId, [(movieId, rating)])
  })
  .reduceByKey((a, b) => a ++ b)                 // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements))  // (userId, DenseVector)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)                        // (userId, cosineSimilarity)
  })
  .sortBy(_._2, false)
  .map(rec => rec._1)

val similarUsers = similarUsersRDD.take(p)
println("number of similar users selected: %d\n".format(similarUsers.size))

val similarUserSet = similarUsers.toSet
val similarUserSet_b = sc.broadcast(similarUserSet)

val candidateMoviesRDD = graph.edges
  .filter(e => similarUserSet_b.value.contains(e.srcId))  // rated by p users
  .filter(e => e.srcId != sourceUserId)                   // not rated by user u

// step 5: recommend top q movies with highest average rating
val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
    val userId = e.srcId
    val movieId = e.dstId
    val rating = e.attr
    (movieId, List(rating))
  })                                  // (movieId, [rating])
  .reduceByKey((a, b) => a ++ b)      // (movieId, List(rating))
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
  .sortBy(_._2, false)                // (movieId, averageRating*)

val recommendedMovies = recommendedMoviesRDD.take(q)

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

// clean up
ratingsRDD.unpersist()
userVerticesRDD.unpersist()
movieVerticesRDD.unpersist()
verticesRDD.unpersist()
relationshipsRDD.unpersist()
moviesRatedByUserRDD.unpersist()
usersWhoRatedMoviesRDD.unpersist()
similarUsersRDD.unpersist()
candidateMoviesRDD.unpersist()
recommendedMoviesRDD.unpersist()
graph.unpersistVertices()

Returns the following recommendations, which with hindsight I realized was incorrect, since I don't properly implement the requirements in step 4 (the part where we have to exclude the movies already rated by u) here. But here it is, just to give you an idea of what the output looks like.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1598] City of Industry (1997)
(5.000) [1536] Aiqing wansui (1994)
(5.000) [814] Great Day in Harlem, A (1994)
(5.000) [1594] Everest (1998)
(5.000) [1500] Santa with Muscles (1996)
(5.000) [1463] Boys, Les (1997)
(5.000) [1491] Tough and Deadly (1995)
(5.000) [1189] Prefontaine (1997)
(4.813) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.500) [868] Hearts and Minds (1996)

After I did this, though, I realized that all I was doing was just trying to force-fit the problem into my existing RDD knowledge, and not using the power of the GraphX API. So the following code, which I built off the old one, is more GraphX-y, for lack of a better word. The new code uses the GraphX function aggregateMessages() pretty heavily. I believe this technique is pretty central to graph based computing, since I remember at least a couple of presentations where the presenter talked about GraphX (and its predecessor Pregel), both of them mentioned this particular style of computation.

The aggregateMessages function is very powerful, a bit of a Swiss army knife. You can think of this in terms of map-reduce. The Map step usually applies some kind of filter criterion to limit the set of triplets from the graph, then sends messages with some computed or retrieved payload to one of the nodes (source or destination). The Reduce step applies a function to aggregate the messages at each of the nodes. The output is an RDD of (VertexID, aggregated message). Here is a diagram showing this in action - the map step sends messages as shown by the red arrows, and the reduce step aggregates these messages as shown by the blue dotted arcs.


The other major difference that I noticed was the absence of joins. The new code relies more on filtering the graph down to a set of nodes, and then passing messages to its nearest neighbors. There is much more back and forth between the master and workers using broadcast and collect. This is not necessarily as bad as it sounds though, since you would typically use graphs to represent sparse structures, and thus, each graph operation would result in a small set of constraints that you would pass on to the next operation. Thus the amount of data returned from each operation tends to be small and collecting or broadcasting it is not a very heavyweight operation.

Also, as a practical matter, it helps to draw out little diagrams like the picture above to figure out how to set up the aggregateMessages calls in the code.

So, in any case, here is the code. I have left the old code in as commented out blocks, to make it easier to figure out what the new GraphX code is replacing.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// load movielens data
val MOVIELENS_DIR = "/path/to/MovieLens-100k"

val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by given user u
// val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
//   .map(e => (e.dstId, e.attr))
// moviesRatedByUserRDD.take(10)

val moviesRatedByUser = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
  .collect
  .toMap
val moviesRatedByUser_b = sc.broadcast(moviesRatedByUser)
println("# movies rated by user: %d".format(moviesRatedByUser.size))

// step 2: find all users U who rated set of movies M
// val usersWhoRatedMoviesRDD = graph.edges.map(e => (e.dstId, (e.srcId, e.attr)))  // (movieId, (userId, rating_l))
//   .join(moviesRatedByUserRDD)                    // (movieId, rating_r)
//   .map(j => {                                    // (movieId, ((userId, rating_l), rating_r))
//     val movieId = j._1
//     val userId = j._2._1._1
//     val srcRating = j._2._1._2
//     val targetRating = j._2._2
//     (userId, movieId, srcRating, targetRating)   // (userId, movieId, rating_l, rating_r)
//   })
// usersWhoRatedMoviesRDD.take(10)

val usersWhoRatedMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // consider only movies that user u has rated
      if (moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // take user and send to the movie to be aggregated
        triplet.sendToDst(List(triplet.srcId))
      }
    },
    // reduce userIds into single list
    (a, b) => (a ++ b)
  )                                                 // (movieId, List[userId])
  .flatMap(rec => {
    val movieId = rec._1
    val userIds = rec._2
    userIds.map(userId => (userId, 1))              // (userId, 1)
  })
  .reduceByKey((a, b) => a + b)                     // (userId, n)
  .map(rec => rec._1)                               // unique List(userId)
  .collect
  .toSet

val usersWhoRatedMovies_b = sc.broadcast(usersWhoRatedMovies)
println("# unique users: %d".format(usersWhoRatedMovies.size))

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

// val similarUsersRDD = usersWhoRatedMoviesRDD.filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))       // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)            // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                   // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))

val userVectorsRDD: RDD[(VertexId, DenseVector[Double])] = graph
  .aggregateMessages[List[(Long, Double)]](
    triplet => { // map function
      // consider only users that rated movies M
      if (usersWhoRatedMovies_b.value.contains(triplet.srcId)) {
        // send to each user the target movieId and rating
        triplet.sendToSrc(List((triplet.dstId, triplet.attr)))
      }
    },
    // reduce to a single list
    (a, b) => (a ++ b)
  )                                        // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements)) // (userId, ratingVector)

// val sourceVectorElements = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 == sourceUserId)
//   .map(rec => {
//     val movieId = rec._2
//     val rating = rec._3
//     (movieId, rating)
//   })
//   .collect
//   .toList
// val sourceVec = buildVector(sourceVectorElements)
// val sourceVec_b = sc.broadcast(sourceVec)
//
// val similarUsersRDD = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))           // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)                // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                       // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))
//
// val similarUserSet = similarUsers.toSet
// val similarUserSet_b = sc.broadcast(similarUserSet)

val sourceVec = userVectorsRDD.filter(rec => rec._1 == sourceUserId)
  .map(_._2)
  .collect
  .toList(0)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = userVectorsRDD.filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)
  })

val similarUserSet = similarUsersRDD.takeOrdered(p)(Ordering[Double].reverse.on(rec => rec._2))
  .map(rec => rec._1)
  .toSet
val similarUserSet_b = sc.broadcast(similarUserSet)
println("# of similar users: %d".format(similarUserSet.size))

// step 4: find all movies M' these p persons rated that u hasn't rated yet
// :WARNING: this block does not implement requirement correctly, following block does.
// val candidateMoviesRDD = graph.edges
//   .filter(e => similarUserSet_b.value.contains(e.srcId))           // rated by p users
//   .filter(e => e.srcId != sourceUserId)                            // not rated by user u
// candidateMoviesRDD.count

val candidateMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // only consider users in the set p of similar users,
      // exclude movies rated by user u
      if (similarUserSet_b.value.contains(triplet.srcId) &&
         !moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // send message [movieId] back to user
        triplet.sendToSrc(List(triplet.dstId))
      }
    },
    // reduce function
    (a, b) => a ++ b
  )                                             // (userId, List(movieId))
  .flatMap(rec => {
    val userId = rec._1
    val movieIds = rec._2
    movieIds.map(movieId => (movieId, 1))       // (movieId, 1)
  })
  .reduceByKey((a, b) => a + b)                 // (movieId, count)
  .map(_._1)                                    // (movieId)
  .collect
  .toSet

val candidateMovies_b = sc.broadcast(candidateMovies)
println("# of candidate movies for recommendation: %d".format(candidateMovies.size))

// step 5: recommend top q movies with highest average rating
// val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
//     val userId = e.srcId
//     val movieId = e.dstId
//     val rating = e.attr
//     (movieId, List(rating))
//   })                                          // (movieId, [rating])
//   .reduceByKey((a, b) => a ++ b)              // (movieId, List(rating))
//   .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
//   .sortBy(_._2, false)                        // (movieId, averageRating*)

val recommendedMoviesRDD: RDD[(VertexId, Double)] = graph
  .aggregateMessages[List[Double]](
    triplet => { // map function
      // limit search to movies rated by top p similar users
      if (candidateMovies_b.value.contains(triplet.dstId)) {
        // send ratings to movie nodes
        triplet.sendToDst(List(triplet.attr))
      }
    },
    // reduce ratings to single list per movie
    (a, b) => (a ++ b)
  )
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)

val recommendedMovies = recommendedMoviesRDD.takeOrdered(q)(Ordering[Double].reverse.on(rec => rec._2))
println("#-recommended: %d".format(recommendedMovies.size))

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1.toLong
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

And the output from this code is as shown below. As you can see, the problem is that I did not correctly implement step 4, specifically removing all the movies that were already rated by the given user, in the old RDD based code. One thing to note is that the GraphX approach of filtering out nodes by condition makes it easier to work with than the join based approach I had used earlier, perhaps because the API leverages the structure of the data it is working on.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1189] Prefontaine (1997)
(4.500) [1594] Everest (1998)
(4.500) [1398] Anna (1996)
(4.466) [318] Schindler's List (1993)
(4.466) [169] Wrong Trousers, The (1993)
(4.457) [483] Casablanca (1942)
(4.448) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.445) [64] Shawshank Redemption, The (1994)
(4.388) [603] Rear Window (1954)
(4.386) [12] Usual Suspects, The (1995)

So there you have it. I had been meaning to take a look at the GraphX API for a while, but somehow it never got very high on my list of priorities until now. I figure that this may be true for many people as well, they are comfortable working with RDDs and DataFrames and don't really see a need for something like GraphX until they need it, and then they figure they can get by with RDDs and DataFrames anyway. Hopefully, if you have been meaning to try it out, this gives you a sneak peek into how you can do so. For me, it was a fun exercise, so I thought I would write about it. Hopefully you found it interesting as well.