Friday, August 31, 2018

Collaborative Filtering Recommender with Spark GraphX


Earlier this week, I attended a webinar titled Building the Next Generation Recommendation Engine with a Graph Database hosted by TigerGraph as part of their Graph Gurus series. I attended because I am doing some work with recommenders nowadays, and in a past life, I used to do a lot with graphs (not recommenders), and I was curious how they were using graphs for recommenders. I was thinking in terms of graph adjacency heuristics as features for content based recommenders, so when they demo-ed a simple collaborative filtering recommender during the webinar, I had a bit of a duh moment.

In any case, it got me thinking about trying to implement this using Spark GraphX. I have been meaning to take a look at GraphX for a while, but the stars never quite lined up enough for me to take the plunge. While the use case is probably more suitable for use on a graph database such as TigerGraph, it seemed like a good way to experiment with the GraphX API. So I did, and I describe the experience in today's post.

The recommendations are based on the intuition that people who liked the items that you liked also liked these other items. I used the MovieLens 100k dataset that is made available thanks to the GroupLens project. So in our case, we will recommend movies to a user based on movies other people liked who liked the same movies as that user. The algorithm goes something like this:

  1. Find all movies M rated by a specific user u.
  2. Find all users U who rated at least one of the movies in M.
  3. Based on ratings, find p users who have the most similar tastes as our original user u.
  4. Find all movies M' these p people rated that u has not rated yet.
  5. Recommend top q movies with highest average rating by the p persons.

My first attempt used the GraphX APi to build a graph, but then I just fell back to using the triplets as an RDD, and using standard RDD operations to implement the steps. Here is the code I came up with originally. Note that I use the Databricks notebook environment, so the code does not have the standard Spark boilerplate wrapper code.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val MOVIELENS_DIR = "/path/to/MovieLens-100k"

// load movielens data
val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })
ratingsRDD.take(10)

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by the given user u
val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
println("# movies rated by user u: %d".format(moviesRatedByUserRDD.count))

// step 2: find all users U who rated set of movies M
val usersWhoRatedMoviesRDD = graph.edges
  .map(e => (e.dstId, (e.srcId, e.attr)))      // (movieId, (userId, rating_l))
  .join(moviesRatedByUserRDD)                  // (movieId, rating_r)
  .map(j => {                                  // (movieId, ((userId, rating_l), rating_r))
    val movieId = j._1
    val userId = j._2._1._1
    val srcRating = j._2._1._2
    val targetRating = j._2._2
    (userId, movieId, srcRating, targetRating) // (userId, movieId, rating_l, rating_r)
  })
// usersWhoRatedMoviesRDD.take(10)

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

val sourceVectorElements = usersWhoRatedMoviesRDD.filter(rec => rec._1 == sourceUserId)
  .map(rec => {
    val movieId = rec._2
    val rating = rec._3
    (movieId, rating)
  })
  .collect
  .toList
val sourceVec = buildVector(sourceVectorElements)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = usersWhoRatedMoviesRDD
  .filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val userId = rec._1
    val movieId = rec._2
    val rating = rec._4
    (userId, List((movieId, rating)))            // (userId, [(movieId, rating)])
  })
  .reduceByKey((a, b) => a ++ b)                 // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements))  // (userId, DenseVector)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)                        // (userId, cosineSimilarity)
  })
  .sortBy(_._2, false)
  .map(rec => rec._1)

val similarUsers = similarUsersRDD.take(p)
println("number of similar users selected: %d\n".format(similarUsers.size))

val similarUserSet = similarUsers.toSet
val similarUserSet_b = sc.broadcast(similarUserSet)

val candidateMoviesRDD = graph.edges
  .filter(e => similarUserSet_b.value.contains(e.srcId))  // rated by p users
  .filter(e => e.srcId != sourceUserId)                   // not rated by user u

// step 5: recommend top q movies with highest average rating
val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
    val userId = e.srcId
    val movieId = e.dstId
    val rating = e.attr
    (movieId, List(rating))
  })                                  // (movieId, [rating])
  .reduceByKey((a, b) => a ++ b)      // (movieId, List(rating))
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
  .sortBy(_._2, false)                // (movieId, averageRating*)

val recommendedMovies = recommendedMoviesRDD.take(q)

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

// clean up
ratingsRDD.unpersist()
userVerticesRDD.unpersist()
movieVerticesRDD.unpersist()
verticesRDD.unpersist()
relationshipsRDD.unpersist()
moviesRatedByUserRDD.unpersist()
usersWhoRatedMoviesRDD.unpersist()
similarUsersRDD.unpersist()
candidateMoviesRDD.unpersist()
recommendedMoviesRDD.unpersist()
graph.unpersistVertices()

Returns the following recommendations, which with hindsight I realized was incorrect, since I don't properly implement the requirements in step 4 (the part where we have to exclude the movies already rated by u) here. But here it is, just to give you an idea of what the output looks like.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1598] City of Industry (1997)
(5.000) [1536] Aiqing wansui (1994)
(5.000) [814] Great Day in Harlem, A (1994)
(5.000) [1594] Everest (1998)
(5.000) [1500] Santa with Muscles (1996)
(5.000) [1463] Boys, Les (1997)
(5.000) [1491] Tough and Deadly (1995)
(5.000) [1189] Prefontaine (1997)
(4.813) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.500) [868] Hearts and Minds (1996)

After I did this, though, I realized that all I was doing was just trying to force-fit the problem into my existing RDD knowledge, and not using the power of the GraphX API. So the following code, which I built off the old one, is more GraphX-y, for lack of a better word. The new code uses the GraphX function aggregateMessages() pretty heavily. I believe this technique is pretty central to graph based computing, since I remember at least a couple of presentations where the presenter talked about GraphX (and its predecessor Pregel), both of them mentioned this particular style of computation.

The aggregateMessages function is very powerful, a bit of a Swiss army knife. You can think of this in terms of map-reduce. The Map step usually applies some kind of filter criterion to limit the set of triplets from the graph, then sends messages with some computed or retrieved payload to one of the nodes (source or destination). The Reduce step applies a function to aggregate the messages at each of the nodes. The output is an RDD of (VertexID, aggregated message). Here is a diagram showing this in action - the map step sends messages as shown by the red arrows, and the reduce step aggregates these messages as shown by the blue dotted arcs.


The other major difference that I noticed was the absence of joins. The new code relies more on filtering the graph down to a set of nodes, and then passing messages to its nearest neighbors. There is much more back and forth between the master and workers using broadcast and collect. This is not necessarily as bad as it sounds though, since you would typically use graphs to represent sparse structures, and thus, each graph operation would result in a small set of constraints that you would pass on to the next operation. Thus the amount of data returned from each operation tends to be small and collecting or broadcasting it is not a very heavyweight operation.

Also, as a practical matter, it helps to draw out little diagrams like the picture above to figure out how to set up the aggregateMessages calls in the code.

So, in any case, here is the code. I have left the old code in as commented out blocks, to make it easier to figure out what the new GraphX code is replacing.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// load movielens data
val MOVIELENS_DIR = "/path/to/MovieLens-100k"

val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by given user u
// val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
//   .map(e => (e.dstId, e.attr))
// moviesRatedByUserRDD.take(10)

val moviesRatedByUser = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
  .collect
  .toMap
val moviesRatedByUser_b = sc.broadcast(moviesRatedByUser)
println("# movies rated by user: %d".format(moviesRatedByUser.size))

// step 2: find all users U who rated set of movies M
// val usersWhoRatedMoviesRDD = graph.edges.map(e => (e.dstId, (e.srcId, e.attr)))  // (movieId, (userId, rating_l))
//   .join(moviesRatedByUserRDD)                    // (movieId, rating_r)
//   .map(j => {                                    // (movieId, ((userId, rating_l), rating_r))
//     val movieId = j._1
//     val userId = j._2._1._1
//     val srcRating = j._2._1._2
//     val targetRating = j._2._2
//     (userId, movieId, srcRating, targetRating)   // (userId, movieId, rating_l, rating_r)
//   })
// usersWhoRatedMoviesRDD.take(10)

val usersWhoRatedMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // consider only movies that user u has rated
      if (moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // take user and send to the movie to be aggregated
        triplet.sendToDst(List(triplet.srcId))
      }
    },
    // reduce userIds into single list
    (a, b) => (a ++ b)
  )                                                 // (movieId, List[userId])
  .flatMap(rec => {
    val movieId = rec._1
    val userIds = rec._2
    userIds.map(userId => (userId, 1))              // (userId, 1)
  })
  .reduceByKey((a, b) => a + b)                     // (userId, n)
  .map(rec => rec._1)                               // unique List(userId)
  .collect
  .toSet

val usersWhoRatedMovies_b = sc.broadcast(usersWhoRatedMovies)
println("# unique users: %d".format(usersWhoRatedMovies.size))

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

// val similarUsersRDD = usersWhoRatedMoviesRDD.filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))       // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)            // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                   // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))

val userVectorsRDD: RDD[(VertexId, DenseVector[Double])] = graph
  .aggregateMessages[List[(Long, Double)]](
    triplet => { // map function
      // consider only users that rated movies M
      if (usersWhoRatedMovies_b.value.contains(triplet.srcId)) {
        // send to each user the target movieId and rating
        triplet.sendToSrc(List((triplet.dstId, triplet.attr)))
      }
    },
    // reduce to a single list
    (a, b) => (a ++ b)
  )                                        // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements)) // (userId, ratingVector)

// val sourceVectorElements = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 == sourceUserId)
//   .map(rec => {
//     val movieId = rec._2
//     val rating = rec._3
//     (movieId, rating)
//   })
//   .collect
//   .toList
// val sourceVec = buildVector(sourceVectorElements)
// val sourceVec_b = sc.broadcast(sourceVec)
//
// val similarUsersRDD = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))           // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)                // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                       // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))
//
// val similarUserSet = similarUsers.toSet
// val similarUserSet_b = sc.broadcast(similarUserSet)

val sourceVec = userVectorsRDD.filter(rec => rec._1 == sourceUserId)
  .map(_._2)
  .collect
  .toList(0)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = userVectorsRDD.filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)
  })

val similarUserSet = similarUsersRDD.takeOrdered(p)(Ordering[Double].reverse.on(rec => rec._2))
  .map(rec => rec._1)
  .toSet
val similarUserSet_b = sc.broadcast(similarUserSet)
println("# of similar users: %d".format(similarUserSet.size))

// step 4: find all movies M' these p persons rated that u hasn't rated yet
// :WARNING: this block does not implement requirement correctly, following block does.
// val candidateMoviesRDD = graph.edges
//   .filter(e => similarUserSet_b.value.contains(e.srcId))           // rated by p users
//   .filter(e => e.srcId != sourceUserId)                            // not rated by user u
// candidateMoviesRDD.count

val candidateMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // only consider users in the set p of similar users,
      // exclude movies rated by user u
      if (similarUserSet_b.value.contains(triplet.srcId) &&
         !moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // send message [movieId] back to user
        triplet.sendToSrc(List(triplet.dstId))
      }
    },
    // reduce function
    (a, b) => a ++ b
  )                                             // (userId, List(movieId))
  .flatMap(rec => {
    val userId = rec._1
    val movieIds = rec._2
    movieIds.map(movieId => (movieId, 1))       // (movieId, 1)
  })
  .reduceByKey((a, b) => a + b)                 // (movieId, count)
  .map(_._1)                                    // (movieId)
  .collect
  .toSet

val candidateMovies_b = sc.broadcast(candidateMovies)
println("# of candidate movies for recommendation: %d".format(candidateMovies.size))

// step 5: recommend top q movies with highest average rating
// val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
//     val userId = e.srcId
//     val movieId = e.dstId
//     val rating = e.attr
//     (movieId, List(rating))
//   })                                          // (movieId, [rating])
//   .reduceByKey((a, b) => a ++ b)              // (movieId, List(rating))
//   .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
//   .sortBy(_._2, false)                        // (movieId, averageRating*)

val recommendedMoviesRDD: RDD[(VertexId, Double)] = graph
  .aggregateMessages[List[Double]](
    triplet => { // map function
      // limit search to movies rated by top p similar users
      if (candidateMovies_b.value.contains(triplet.dstId)) {
        // send ratings to movie nodes
        triplet.sendToDst(List(triplet.attr))
      }
    },
    // reduce ratings to single list per movie
    (a, b) => (a ++ b)
  )
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)

val recommendedMovies = recommendedMoviesRDD.takeOrdered(q)(Ordering[Double].reverse.on(rec => rec._2))
println("#-recommended: %d".format(recommendedMovies.size))

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1.toLong
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

And the output from this code is as shown below. As you can see, the problem is that I did not correctly implement step 4, specifically removing all the movies that were already rated by the given user, in the old RDD based code. One thing to note is that the GraphX approach of filtering out nodes by condition makes it easier to work with than the join based approach I had used earlier, perhaps because the API leverages the structure of the data it is working on.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1189] Prefontaine (1997)
(4.500) [1594] Everest (1998)
(4.500) [1398] Anna (1996)
(4.466) [318] Schindler's List (1993)
(4.466) [169] Wrong Trousers, The (1993)
(4.457) [483] Casablanca (1942)
(4.448) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.445) [64] Shawshank Redemption, The (1994)
(4.388) [603] Rear Window (1954)
(4.386) [12] Usual Suspects, The (1995)

So there you have it. I had been meaning to take a look at the GraphX API for a while, but somehow it never got very high on my list of priorities until now. I figure that this may be true for many people as well, they are comfortable working with RDDs and DataFrames and don't really see a need for something like GraphX until they need it, and then they figure they can get by with RDDs and DataFrames anyway. Hopefully, if you have been meaning to try it out, this gives you a sneak peek into how you can do so. For me, it was a fun exercise, so I thought I would write about it. Hopefully you found it interesting as well.