Earlier this week, I attended a webinar titled Building the Next Generation Recommendation Engine with a Graph Database hosted by TigerGraph as part of their Graph Gurus series. I attended because I am doing some work with recommenders nowadays, and in a past life, I used to do a lot with graphs (not recommenders), and I was curious how they were using graphs for recommenders. I was thinking in terms of graph adjacency heuristics as features for content based recommenders, so when they demo-ed a simple collaborative filtering recommender during the webinar, I had a bit of a duh moment.
In any case, it got me thinking about trying to implement this using Spark GraphX. I have been meaning to take a look at GraphX for a while, but the stars never quite lined up enough for me to take the plunge. While the use case is probably more suitable for use on a graph database such as TigerGraph, it seemed like a good way to experiment with the GraphX API. So I did, and I describe the experience in today's post.
The recommendations are based on the intuition that people who liked the items that you liked also liked these other items. I used the MovieLens 100k dataset that is made available thanks to the GroupLens project. So in our case, we will recommend movies to a user based on movies other people liked who liked the same movies as that user. The algorithm goes something like this:
- Find all movies M rated by a specific user u.
- Find all users U who rated at least one of the movies in M.
- Based on ratings, find p users who have the most similar tastes as our original user u.
- Find all movies M' these p people rated that u has not rated yet.
- Recommend top q movies with highest average rating by the p persons.
My first attempt used the GraphX APi to build a graph, but then I just fell back to using the triplets as an RDD, and using standard RDD operations to implement the steps. Here is the code I came up with originally. Note that I use the Databricks notebook environment, so the code does not have the standard Spark boilerplate wrapper code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | import breeze.linalg._
import breeze.numerics._
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val MOVIELENS_DIR = "/path/to/MovieLens-100k"
// load movielens data
val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000
val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
.map(line => {
val cols = line.split('\t')
val userId = cols(0).toLong
val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
val rating = cols(2).toDouble
(userId, movieId, rating)
})
ratingsRDD.take(10)
val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
.map(line => {
val cols = line.split('|')
val movieId = cols(0).toLong + 50000
val title = cols(1)
movieId -> title
})
.collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)
// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)
val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))
val graph = Graph(verticesRDD, relationshipsRDD)
print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)
// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10 // number of movies to recommend
// step 1: find all movies M rated by the given user u
val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
.map(e => (e.dstId, e.attr))
println("# movies rated by user u: %d".format(moviesRatedByUserRDD.count))
// step 2: find all users U who rated set of movies M
val usersWhoRatedMoviesRDD = graph.edges
.map(e => (e.dstId, (e.srcId, e.attr))) // (movieId, (userId, rating_l))
.join(moviesRatedByUserRDD) // (movieId, rating_r)
.map(j => { // (movieId, ((userId, rating_l), rating_r))
val movieId = j._1
val userId = j._2._1._1
val srcRating = j._2._1._2
val targetRating = j._2._2
(userId, movieId, srcRating, targetRating) // (userId, movieId, rating_l, rating_r)
})
// usersWhoRatedMoviesRDD.take(10)
// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
val vec = DenseVector.zeros[Double](NUM_MOVIES)
elements.foreach(e => {
val vecIdx = (e._1 - 50001).toInt
val vecVal = e._2
vec(vecIdx) = vecVal
})
vec
}
def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
(vec1 dot vec2) / (norm(vec1) * norm(vec2))
}
val sourceVectorElements = usersWhoRatedMoviesRDD.filter(rec => rec._1 == sourceUserId)
.map(rec => {
val movieId = rec._2
val rating = rec._3
(movieId, rating)
})
.collect
.toList
val sourceVec = buildVector(sourceVectorElements)
val sourceVec_b = sc.broadcast(sourceVec)
val similarUsersRDD = usersWhoRatedMoviesRDD
.filter(rec => rec._1 != sourceUserId)
.map(rec => {
val userId = rec._1
val movieId = rec._2
val rating = rec._4
(userId, List((movieId, rating))) // (userId, [(movieId, rating)])
})
.reduceByKey((a, b) => a ++ b) // (userId, List[(movieId, rating)])
.mapValues(elements => buildVector(elements)) // (userId, DenseVector)
.map(rec => {
val targetUserId = rec._1
val targetVec = rec._2
val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
(targetUserId, cosim) // (userId, cosineSimilarity)
})
.sortBy(_._2, false)
.map(rec => rec._1)
val similarUsers = similarUsersRDD.take(p)
println("number of similar users selected: %d\n".format(similarUsers.size))
val similarUserSet = similarUsers.toSet
val similarUserSet_b = sc.broadcast(similarUserSet)
val candidateMoviesRDD = graph.edges
.filter(e => similarUserSet_b.value.contains(e.srcId)) // rated by p users
.filter(e => e.srcId != sourceUserId) // not rated by user u
// step 5: recommend top q movies with highest average rating
val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
val userId = e.srcId
val movieId = e.dstId
val rating = e.attr
(movieId, List(rating))
}) // (movieId, [rating])
.reduceByKey((a, b) => a ++ b) // (movieId, List(rating))
.mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size) // (movieId, averageRating)
.sortBy(_._2, false) // (movieId, averageRating*)
val recommendedMovies = recommendedMoviesRDD.take(q)
print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
val movieId = rec._1
val score = rec._2
val title = movieId2Title(movieId)
print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})
// clean up
ratingsRDD.unpersist()
userVerticesRDD.unpersist()
movieVerticesRDD.unpersist()
verticesRDD.unpersist()
relationshipsRDD.unpersist()
moviesRatedByUserRDD.unpersist()
usersWhoRatedMoviesRDD.unpersist()
similarUsersRDD.unpersist()
candidateMoviesRDD.unpersist()
recommendedMoviesRDD.unpersist()
graph.unpersistVertices()
|
Returns the following recommendations, which with hindsight I realized was incorrect, since I don't properly implement the requirements in step 4 (the part where we have to exclude the movies already rated by u) here. But here it is, just to give you an idea of what the output looks like.
1 2 3 4 5 6 7 8 9 10 11 | ---- recommended movies ----
(5.000) [1598] City of Industry (1997)
(5.000) [1536] Aiqing wansui (1994)
(5.000) [814] Great Day in Harlem, A (1994)
(5.000) [1594] Everest (1998)
(5.000) [1500] Santa with Muscles (1996)
(5.000) [1463] Boys, Les (1997)
(5.000) [1491] Tough and Deadly (1995)
(5.000) [1189] Prefontaine (1997)
(4.813) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.500) [868] Hearts and Minds (1996)
|
After I did this, though, I realized that all I was doing was just trying to force-fit the problem into my existing RDD knowledge, and not using the power of the GraphX API. So the following code, which I built off the old one, is more GraphX-y, for lack of a better word. The new code uses the GraphX function aggregateMessages() pretty heavily. I believe this technique is pretty central to graph based computing, since I remember at least a couple of presentations where the presenter talked about GraphX (and its predecessor Pregel), both of them mentioned this particular style of computation.
The aggregateMessages function is very powerful, a bit of a Swiss army knife. You can think of this in terms of map-reduce. The Map step usually applies some kind of filter criterion to limit the set of triplets from the graph, then sends messages with some computed or retrieved payload to one of the nodes (source or destination). The Reduce step applies a function to aggregate the messages at each of the nodes. The output is an RDD of (VertexID, aggregated message). Here is a diagram showing this in action - the map step sends messages as shown by the red arrows, and the reduce step aggregates these messages as shown by the blue dotted arcs.
The other major difference that I noticed was the absence of joins. The new code relies more on filtering the graph down to a set of nodes, and then passing messages to its nearest neighbors. There is much more back and forth between the master and workers using broadcast and collect. This is not necessarily as bad as it sounds though, since you would typically use graphs to represent sparse structures, and thus, each graph operation would result in a small set of constraints that you would pass on to the next operation. Thus the amount of data returned from each operation tends to be small and collecting or broadcasting it is not a very heavyweight operation.
Also, as a practical matter, it helps to draw out little diagrams like the picture above to figure out how to set up the aggregateMessages calls in the code.
So, in any case, here is the code. I have left the old code in as commented out blocks, to make it easier to figure out what the new GraphX code is replacing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | import breeze.linalg._
import breeze.numerics._
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// load movielens data
val MOVIELENS_DIR = "/path/to/MovieLens-100k"
val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000
val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
.map(line => {
val cols = line.split('\t')
val userId = cols(0).toLong
val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
val rating = cols(2).toDouble
(userId, movieId, rating)
})
val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
.map(line => {
val cols = line.split('|')
val movieId = cols(0).toLong + 50000
val title = cols(1)
movieId -> title
})
.collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)
// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)
val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))
val graph = Graph(verticesRDD, relationshipsRDD)
print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)
// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10 // number of movies to recommend
// step 1: find all movies M rated by given user u
// val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
// .map(e => (e.dstId, e.attr))
// moviesRatedByUserRDD.take(10)
val moviesRatedByUser = graph.edges.filter(e => e.srcId == sourceUserId)
.map(e => (e.dstId, e.attr))
.collect
.toMap
val moviesRatedByUser_b = sc.broadcast(moviesRatedByUser)
println("# movies rated by user: %d".format(moviesRatedByUser.size))
// step 2: find all users U who rated set of movies M
// val usersWhoRatedMoviesRDD = graph.edges.map(e => (e.dstId, (e.srcId, e.attr))) // (movieId, (userId, rating_l))
// .join(moviesRatedByUserRDD) // (movieId, rating_r)
// .map(j => { // (movieId, ((userId, rating_l), rating_r))
// val movieId = j._1
// val userId = j._2._1._1
// val srcRating = j._2._1._2
// val targetRating = j._2._2
// (userId, movieId, srcRating, targetRating) // (userId, movieId, rating_l, rating_r)
// })
// usersWhoRatedMoviesRDD.take(10)
val usersWhoRatedMovies = graph.aggregateMessages[List[Long]](
triplet => { // map function
// consider only movies that user u has rated
if (moviesRatedByUser_b.value.contains(triplet.dstId)) {
// take user and send to the movie to be aggregated
triplet.sendToDst(List(triplet.srcId))
}
},
// reduce userIds into single list
(a, b) => (a ++ b)
) // (movieId, List[userId])
.flatMap(rec => {
val movieId = rec._1
val userIds = rec._2
userIds.map(userId => (userId, 1)) // (userId, 1)
})
.reduceByKey((a, b) => a + b) // (userId, n)
.map(rec => rec._1) // unique List(userId)
.collect
.toSet
val usersWhoRatedMovies_b = sc.broadcast(usersWhoRatedMovies)
println("# unique users: %d".format(usersWhoRatedMovies.size))
// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
val vec = DenseVector.zeros[Double](NUM_MOVIES)
elements.foreach(e => {
val vecIdx = (e._1 - 50001).toInt
val vecVal = e._2
vec(vecIdx) = vecVal
})
vec
}
def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
(vec1 dot vec2) / (norm(vec1) * norm(vec2))
}
// val similarUsersRDD = usersWhoRatedMoviesRDD.filter(rec => rec._1 != sourceUserId)
// .map(rec => {
// val userId = rec._1
// val movieId = rec._2
// val rating = rec._4
// (userId, List((movieId, rating))) // (userId, [(movieId, rating)])
// })
// .reduceByKey((a, b) => a ++ b) // (userId, List[(movieId, rating)])
// .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
// .map(rec => {
// val targetUserId = rec._1
// val targetVec = rec._2
// val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
// (targetUserId, cosim) // (userId, cosineSimilarity)
// })
// .sortBy(_._2, false)
// .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))
val userVectorsRDD: RDD[(VertexId, DenseVector[Double])] = graph
.aggregateMessages[List[(Long, Double)]](
triplet => { // map function
// consider only users that rated movies M
if (usersWhoRatedMovies_b.value.contains(triplet.srcId)) {
// send to each user the target movieId and rating
triplet.sendToSrc(List((triplet.dstId, triplet.attr)))
}
},
// reduce to a single list
(a, b) => (a ++ b)
) // (userId, List[(movieId, rating)])
.mapValues(elements => buildVector(elements)) // (userId, ratingVector)
// val sourceVectorElements = usersWhoRatedMoviesRDD
// .filter(rec => rec._1 == sourceUserId)
// .map(rec => {
// val movieId = rec._2
// val rating = rec._3
// (movieId, rating)
// })
// .collect
// .toList
// val sourceVec = buildVector(sourceVectorElements)
// val sourceVec_b = sc.broadcast(sourceVec)
//
// val similarUsersRDD = usersWhoRatedMoviesRDD
// .filter(rec => rec._1 != sourceUserId)
// .map(rec => {
// val userId = rec._1
// val movieId = rec._2
// val rating = rec._4
// (userId, List((movieId, rating))) // (userId, [(movieId, rating)])
// })
// .reduceByKey((a, b) => a ++ b) // (userId, List[(movieId, rating)])
// .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
// .map(rec => {
// val targetUserId = rec._1
// val targetVec = rec._2
// val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
// (targetUserId, cosim) // (userId, cosineSimilarity)
// })
// .sortBy(_._2, false)
// .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))
//
// val similarUserSet = similarUsers.toSet
// val similarUserSet_b = sc.broadcast(similarUserSet)
val sourceVec = userVectorsRDD.filter(rec => rec._1 == sourceUserId)
.map(_._2)
.collect
.toList(0)
val sourceVec_b = sc.broadcast(sourceVec)
val similarUsersRDD = userVectorsRDD.filter(rec => rec._1 != sourceUserId)
.map(rec => {
val targetUserId = rec._1
val targetVec = rec._2
val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
(targetUserId, cosim)
})
val similarUserSet = similarUsersRDD.takeOrdered(p)(Ordering[Double].reverse.on(rec => rec._2))
.map(rec => rec._1)
.toSet
val similarUserSet_b = sc.broadcast(similarUserSet)
println("# of similar users: %d".format(similarUserSet.size))
// step 4: find all movies M' these p persons rated that u hasn't rated yet
// :WARNING: this block does not implement requirement correctly, following block does.
// val candidateMoviesRDD = graph.edges
// .filter(e => similarUserSet_b.value.contains(e.srcId)) // rated by p users
// .filter(e => e.srcId != sourceUserId) // not rated by user u
// candidateMoviesRDD.count
val candidateMovies = graph.aggregateMessages[List[Long]](
triplet => { // map function
// only consider users in the set p of similar users,
// exclude movies rated by user u
if (similarUserSet_b.value.contains(triplet.srcId) &&
!moviesRatedByUser_b.value.contains(triplet.dstId)) {
// send message [movieId] back to user
triplet.sendToSrc(List(triplet.dstId))
}
},
// reduce function
(a, b) => a ++ b
) // (userId, List(movieId))
.flatMap(rec => {
val userId = rec._1
val movieIds = rec._2
movieIds.map(movieId => (movieId, 1)) // (movieId, 1)
})
.reduceByKey((a, b) => a + b) // (movieId, count)
.map(_._1) // (movieId)
.collect
.toSet
val candidateMovies_b = sc.broadcast(candidateMovies)
println("# of candidate movies for recommendation: %d".format(candidateMovies.size))
// step 5: recommend top q movies with highest average rating
// val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
// val userId = e.srcId
// val movieId = e.dstId
// val rating = e.attr
// (movieId, List(rating))
// }) // (movieId, [rating])
// .reduceByKey((a, b) => a ++ b) // (movieId, List(rating))
// .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size) // (movieId, averageRating)
// .sortBy(_._2, false) // (movieId, averageRating*)
val recommendedMoviesRDD: RDD[(VertexId, Double)] = graph
.aggregateMessages[List[Double]](
triplet => { // map function
// limit search to movies rated by top p similar users
if (candidateMovies_b.value.contains(triplet.dstId)) {
// send ratings to movie nodes
triplet.sendToDst(List(triplet.attr))
}
},
// reduce ratings to single list per movie
(a, b) => (a ++ b)
)
.mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)
val recommendedMovies = recommendedMoviesRDD.takeOrdered(q)(Ordering[Double].reverse.on(rec => rec._2))
println("#-recommended: %d".format(recommendedMovies.size))
print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
val movieId = rec._1.toLong
val score = rec._2
val title = movieId2Title(movieId)
print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})
|
And the output from this code is as shown below. As you can see, the problem is that I did not correctly implement step 4, specifically removing all the movies that were already rated by the given user, in the old RDD based code. One thing to note is that the GraphX approach of filtering out nodes by condition makes it easier to work with than the join based approach I had used earlier, perhaps because the API leverages the structure of the data it is working on.
1 2 3 4 5 6 7 8 9 10 11 | ---- recommended movies ----
(5.000) [1189] Prefontaine (1997)
(4.500) [1594] Everest (1998)
(4.500) [1398] Anna (1996)
(4.466) [318] Schindler's List (1993)
(4.466) [169] Wrong Trousers, The (1993)
(4.457) [483] Casablanca (1942)
(4.448) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.445) [64] Shawshank Redemption, The (1994)
(4.388) [603] Rear Window (1954)
(4.386) [12] Usual Suspects, The (1995)
|
So there you have it. I had been meaning to take a look at the GraphX API for a while, but somehow it never got very high on my list of priorities until now. I figure that this may be true for many people as well, they are comfortable working with RDDs and DataFrames and don't really see a need for something like GraphX until they need it, and then they figure they can get by with RDDs and DataFrames anyway. Hopefully, if you have been meaning to try it out, this gives you a sneak peek into how you can do so. For me, it was a fun exercise, so I thought I would write about it. Hopefully you found it interesting as well.