Friday, August 31, 2018

Collaborative Filtering Recommender with Spark GraphX


Earlier this week, I attended a webinar titled Building the Next Generation Recommendation Engine with a Graph Database hosted by TigerGraph as part of their Graph Gurus series. I attended because I am doing some work with recommenders nowadays, and in a past life, I used to do a lot with graphs (not recommenders), and I was curious how they were using graphs for recommenders. I was thinking in terms of graph adjacency heuristics as features for content based recommenders, so when they demo-ed a simple collaborative filtering recommender during the webinar, I had a bit of a duh moment.

In any case, it got me thinking about trying to implement this using Spark GraphX. I have been meaning to take a look at GraphX for a while, but the stars never quite lined up enough for me to take the plunge. While the use case is probably more suitable for use on a graph database such as TigerGraph, it seemed like a good way to experiment with the GraphX API. So I did, and I describe the experience in today's post.

The recommendations are based on the intuition that people who liked the items that you liked also liked these other items. I used the MovieLens 100k dataset that is made available thanks to the GroupLens project. So in our case, we will recommend movies to a user based on movies other people liked who liked the same movies as that user. The algorithm goes something like this:

  1. Find all movies M rated by a specific user u.
  2. Find all users U who rated at least one of the movies in M.
  3. Based on ratings, find p users who have the most similar tastes as our original user u.
  4. Find all movies M' these p people rated that u has not rated yet.
  5. Recommend top q movies with highest average rating by the p persons.

My first attempt used the GraphX APi to build a graph, but then I just fell back to using the triplets as an RDD, and using standard RDD operations to implement the steps. Here is the code I came up with originally. Note that I use the Databricks notebook environment, so the code does not have the standard Spark boilerplate wrapper code.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val MOVIELENS_DIR = "/path/to/MovieLens-100k"

// load movielens data
val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })
ratingsRDD.take(10)

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by the given user u
val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
println("# movies rated by user u: %d".format(moviesRatedByUserRDD.count))

// step 2: find all users U who rated set of movies M
val usersWhoRatedMoviesRDD = graph.edges
  .map(e => (e.dstId, (e.srcId, e.attr)))      // (movieId, (userId, rating_l))
  .join(moviesRatedByUserRDD)                  // (movieId, rating_r)
  .map(j => {                                  // (movieId, ((userId, rating_l), rating_r))
    val movieId = j._1
    val userId = j._2._1._1
    val srcRating = j._2._1._2
    val targetRating = j._2._2
    (userId, movieId, srcRating, targetRating) // (userId, movieId, rating_l, rating_r)
  })
// usersWhoRatedMoviesRDD.take(10)

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

val sourceVectorElements = usersWhoRatedMoviesRDD.filter(rec => rec._1 == sourceUserId)
  .map(rec => {
    val movieId = rec._2
    val rating = rec._3
    (movieId, rating)
  })
  .collect
  .toList
val sourceVec = buildVector(sourceVectorElements)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = usersWhoRatedMoviesRDD
  .filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val userId = rec._1
    val movieId = rec._2
    val rating = rec._4
    (userId, List((movieId, rating)))            // (userId, [(movieId, rating)])
  })
  .reduceByKey((a, b) => a ++ b)                 // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements))  // (userId, DenseVector)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)                        // (userId, cosineSimilarity)
  })
  .sortBy(_._2, false)
  .map(rec => rec._1)

val similarUsers = similarUsersRDD.take(p)
println("number of similar users selected: %d\n".format(similarUsers.size))

val similarUserSet = similarUsers.toSet
val similarUserSet_b = sc.broadcast(similarUserSet)

val candidateMoviesRDD = graph.edges
  .filter(e => similarUserSet_b.value.contains(e.srcId))  // rated by p users
  .filter(e => e.srcId != sourceUserId)                   // not rated by user u

// step 5: recommend top q movies with highest average rating
val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
    val userId = e.srcId
    val movieId = e.dstId
    val rating = e.attr
    (movieId, List(rating))
  })                                  // (movieId, [rating])
  .reduceByKey((a, b) => a ++ b)      // (movieId, List(rating))
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
  .sortBy(_._2, false)                // (movieId, averageRating*)

val recommendedMovies = recommendedMoviesRDD.take(q)

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

// clean up
ratingsRDD.unpersist()
userVerticesRDD.unpersist()
movieVerticesRDD.unpersist()
verticesRDD.unpersist()
relationshipsRDD.unpersist()
moviesRatedByUserRDD.unpersist()
usersWhoRatedMoviesRDD.unpersist()
similarUsersRDD.unpersist()
candidateMoviesRDD.unpersist()
recommendedMoviesRDD.unpersist()
graph.unpersistVertices()

Returns the following recommendations, which with hindsight I realized was incorrect, since I don't properly implement the requirements in step 4 (the part where we have to exclude the movies already rated by u) here. But here it is, just to give you an idea of what the output looks like.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1598] City of Industry (1997)
(5.000) [1536] Aiqing wansui (1994)
(5.000) [814] Great Day in Harlem, A (1994)
(5.000) [1594] Everest (1998)
(5.000) [1500] Santa with Muscles (1996)
(5.000) [1463] Boys, Les (1997)
(5.000) [1491] Tough and Deadly (1995)
(5.000) [1189] Prefontaine (1997)
(4.813) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.500) [868] Hearts and Minds (1996)

After I did this, though, I realized that all I was doing was just trying to force-fit the problem into my existing RDD knowledge, and not using the power of the GraphX API. So the following code, which I built off the old one, is more GraphX-y, for lack of a better word. The new code uses the GraphX function aggregateMessages() pretty heavily. I believe this technique is pretty central to graph based computing, since I remember at least a couple of presentations where the presenter talked about GraphX (and its predecessor Pregel), both of them mentioned this particular style of computation.

The aggregateMessages function is very powerful, a bit of a Swiss army knife. You can think of this in terms of map-reduce. The Map step usually applies some kind of filter criterion to limit the set of triplets from the graph, then sends messages with some computed or retrieved payload to one of the nodes (source or destination). The Reduce step applies a function to aggregate the messages at each of the nodes. The output is an RDD of (VertexID, aggregated message). Here is a diagram showing this in action - the map step sends messages as shown by the red arrows, and the reduce step aggregates these messages as shown by the blue dotted arcs.


The other major difference that I noticed was the absence of joins. The new code relies more on filtering the graph down to a set of nodes, and then passing messages to its nearest neighbors. There is much more back and forth between the master and workers using broadcast and collect. This is not necessarily as bad as it sounds though, since you would typically use graphs to represent sparse structures, and thus, each graph operation would result in a small set of constraints that you would pass on to the next operation. Thus the amount of data returned from each operation tends to be small and collecting or broadcasting it is not a very heavyweight operation.

Also, as a practical matter, it helps to draw out little diagrams like the picture above to figure out how to set up the aggregateMessages calls in the code.

So, in any case, here is the code. I have left the old code in as commented out blocks, to make it easier to figure out what the new GraphX code is replacing.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import breeze.linalg._
import breeze.numerics._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// load movielens data
val MOVIELENS_DIR = "/path/to/MovieLens-100k"

val NUM_USERS = 943
val NUM_MOVIES = 1683
val NUM_RATINGS = 100000

val ratingsRDD = sc.textFile(MOVIELENS_DIR + "/u.data")
  .map(line => {
    val cols = line.split('\t')
    val userId = cols(0).toLong
    val movieId = cols(1).toLong + 50000 // to avoid mixups with userId
    val rating = cols(2).toDouble
    (userId, movieId, rating)
  })

val movieId2Title = sc.textFile(MOVIELENS_DIR + "/u.item")
  .map(line => {
    val cols = line.split('|')
    val movieId = cols(0).toLong + 50000
    val title = cols(1)
    movieId -> title
  })
  .collectAsMap
val movieId2Title_b = sc.broadcast(movieId2Title)

// construct graph object
val userVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._1, "NA"))
val movieVerticesRDD: RDD[(VertexId, String)] = ratingsRDD.map(triple => (triple._2, movieId2Title_b.value(triple._2)))
val verticesRDD = userVerticesRDD.union(movieVerticesRDD)

val relationshipsRDD: RDD[Edge[Double]] = ratingsRDD.map(triple => Edge(triple._1, triple._2, triple._3))

val graph = Graph(verticesRDD, relationshipsRDD)

print("%d vertices, %d edges\n".format(graph.vertices.count, graph.edges.count))
assert(graph.edges.count == NUM_RATINGS)

// inputs
val sourceUserId = 21L
val p = 100 // number of users to look at
val q = 10  // number of movies to recommend

// step 1: find all movies M rated by given user u
// val moviesRatedByUserRDD = graph.edges.filter(e => e.srcId == sourceUserId)
//   .map(e => (e.dstId, e.attr))
// moviesRatedByUserRDD.take(10)

val moviesRatedByUser = graph.edges.filter(e => e.srcId == sourceUserId)
  .map(e => (e.dstId, e.attr))
  .collect
  .toMap
val moviesRatedByUser_b = sc.broadcast(moviesRatedByUser)
println("# movies rated by user: %d".format(moviesRatedByUser.size))

// step 2: find all users U who rated set of movies M
// val usersWhoRatedMoviesRDD = graph.edges.map(e => (e.dstId, (e.srcId, e.attr)))  // (movieId, (userId, rating_l))
//   .join(moviesRatedByUserRDD)                    // (movieId, rating_r)
//   .map(j => {                                    // (movieId, ((userId, rating_l), rating_r))
//     val movieId = j._1
//     val userId = j._2._1._1
//     val srcRating = j._2._1._2
//     val targetRating = j._2._2
//     (userId, movieId, srcRating, targetRating)   // (userId, movieId, rating_l, rating_r)
//   })
// usersWhoRatedMoviesRDD.take(10)

val usersWhoRatedMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // consider only movies that user u has rated
      if (moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // take user and send to the movie to be aggregated
        triplet.sendToDst(List(triplet.srcId))
      }
    },
    // reduce userIds into single list
    (a, b) => (a ++ b)
  )                                                 // (movieId, List[userId])
  .flatMap(rec => {
    val movieId = rec._1
    val userIds = rec._2
    userIds.map(userId => (userId, 1))              // (userId, 1)
  })
  .reduceByKey((a, b) => a + b)                     // (userId, n)
  .map(rec => rec._1)                               // unique List(userId)
  .collect
  .toSet

val usersWhoRatedMovies_b = sc.broadcast(usersWhoRatedMovies)
println("# unique users: %d".format(usersWhoRatedMovies.size))

// step 3: find p users with most similar taste as u
def buildVector(elements: List[(Long, Double)]): DenseVector[Double] = {
  val vec = DenseVector.zeros[Double](NUM_MOVIES)
  elements.foreach(e => {
    val vecIdx = (e._1 - 50001).toInt
    val vecVal = e._2
    vec(vecIdx) = vecVal
  })
  vec
}

def cosineSimilarity(vec1: DenseVector[Double], vec2: DenseVector[Double]): Double = {
  (vec1 dot vec2) / (norm(vec1) * norm(vec2))
}

// val similarUsersRDD = usersWhoRatedMoviesRDD.filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))       // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)            // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                   // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))

val userVectorsRDD: RDD[(VertexId, DenseVector[Double])] = graph
  .aggregateMessages[List[(Long, Double)]](
    triplet => { // map function
      // consider only users that rated movies M
      if (usersWhoRatedMovies_b.value.contains(triplet.srcId)) {
        // send to each user the target movieId and rating
        triplet.sendToSrc(List((triplet.dstId, triplet.attr)))
      }
    },
    // reduce to a single list
    (a, b) => (a ++ b)
  )                                        // (userId, List[(movieId, rating)])
  .mapValues(elements => buildVector(elements)) // (userId, ratingVector)

// val sourceVectorElements = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 == sourceUserId)
//   .map(rec => {
//     val movieId = rec._2
//     val rating = rec._3
//     (movieId, rating)
//   })
//   .collect
//   .toList
// val sourceVec = buildVector(sourceVectorElements)
// val sourceVec_b = sc.broadcast(sourceVec)
//
// val similarUsersRDD = usersWhoRatedMoviesRDD
//   .filter(rec => rec._1 != sourceUserId)
//   .map(rec => {
//     val userId = rec._1
//     val movieId = rec._2
//     val rating = rec._4
//     (userId, List((movieId, rating)))           // (userId, [(movieId, rating)])
//   })
//   .reduceByKey((a, b) => a ++ b)                // (userId, List[(movieId, rating)])
//   .mapValues(elements => buildVector(elements)) // (userId, DenseVector)
//   .map(rec => {
//     val targetUserId = rec._1
//     val targetVec = rec._2
//     val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
//     (targetUserId, cosim)                       // (userId, cosineSimilarity)
//   })
//   .sortBy(_._2, false)
//   .map(rec => rec._1)
//
// val similarUsers = similarUsersRDD.take(p)
// print("number of similar users selected: %d\n".format(similarUsers.size))
//
// val similarUserSet = similarUsers.toSet
// val similarUserSet_b = sc.broadcast(similarUserSet)

val sourceVec = userVectorsRDD.filter(rec => rec._1 == sourceUserId)
  .map(_._2)
  .collect
  .toList(0)
val sourceVec_b = sc.broadcast(sourceVec)

val similarUsersRDD = userVectorsRDD.filter(rec => rec._1 != sourceUserId)
  .map(rec => {
    val targetUserId = rec._1
    val targetVec = rec._2
    val cosim = cosineSimilarity(targetVec, sourceVec_b.value)
    (targetUserId, cosim)
  })

val similarUserSet = similarUsersRDD.takeOrdered(p)(Ordering[Double].reverse.on(rec => rec._2))
  .map(rec => rec._1)
  .toSet
val similarUserSet_b = sc.broadcast(similarUserSet)
println("# of similar users: %d".format(similarUserSet.size))

// step 4: find all movies M' these p persons rated that u hasn't rated yet
// :WARNING: this block does not implement requirement correctly, following block does.
// val candidateMoviesRDD = graph.edges
//   .filter(e => similarUserSet_b.value.contains(e.srcId))           // rated by p users
//   .filter(e => e.srcId != sourceUserId)                            // not rated by user u
// candidateMoviesRDD.count

val candidateMovies = graph.aggregateMessages[List[Long]](
    triplet => { // map function
      // only consider users in the set p of similar users,
      // exclude movies rated by user u
      if (similarUserSet_b.value.contains(triplet.srcId) &&
         !moviesRatedByUser_b.value.contains(triplet.dstId)) {
        // send message [movieId] back to user
        triplet.sendToSrc(List(triplet.dstId))
      }
    },
    // reduce function
    (a, b) => a ++ b
  )                                             // (userId, List(movieId))
  .flatMap(rec => {
    val userId = rec._1
    val movieIds = rec._2
    movieIds.map(movieId => (movieId, 1))       // (movieId, 1)
  })
  .reduceByKey((a, b) => a + b)                 // (movieId, count)
  .map(_._1)                                    // (movieId)
  .collect
  .toSet

val candidateMovies_b = sc.broadcast(candidateMovies)
println("# of candidate movies for recommendation: %d".format(candidateMovies.size))

// step 5: recommend top q movies with highest average rating
// val recommendedMoviesRDD = candidateMoviesRDD.map(e => {
//     val userId = e.srcId
//     val movieId = e.dstId
//     val rating = e.attr
//     (movieId, List(rating))
//   })                                          // (movieId, [rating])
//   .reduceByKey((a, b) => a ++ b)              // (movieId, List(rating))
//   .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)  // (movieId, averageRating)
//   .sortBy(_._2, false)                        // (movieId, averageRating*)

val recommendedMoviesRDD: RDD[(VertexId, Double)] = graph
  .aggregateMessages[List[Double]](
    triplet => { // map function
      // limit search to movies rated by top p similar users
      if (candidateMovies_b.value.contains(triplet.dstId)) {
        // send ratings to movie nodes
        triplet.sendToDst(List(triplet.attr))
      }
    },
    // reduce ratings to single list per movie
    (a, b) => (a ++ b)
  )
  .mapValues(ratings => ratings.foldLeft(0D)(_ + _) / ratings.size)

val recommendedMovies = recommendedMoviesRDD.takeOrdered(q)(Ordering[Double].reverse.on(rec => rec._2))
println("#-recommended: %d".format(recommendedMovies.size))

print("---- recommended movies ----\n")
recommendedMovies.foreach(rec => {
  val movieId = rec._1.toLong
  val score = rec._2
  val title = movieId2Title(movieId)
  print("(%.3f) [%d] %s\n".format(score, movieId - 50000, title))
})

And the output from this code is as shown below. As you can see, the problem is that I did not correctly implement step 4, specifically removing all the movies that were already rated by the given user, in the old RDD based code. One thing to note is that the GraphX approach of filtering out nodes by condition makes it easier to work with than the join based approach I had used earlier, perhaps because the API leverages the structure of the data it is working on.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---- recommended movies ----
(5.000) [1189] Prefontaine (1997)
(4.500) [1594] Everest (1998)
(4.500) [1398] Anna (1996)
(4.466) [318] Schindler's List (1993)
(4.466) [169] Wrong Trousers, The (1993)
(4.457) [483] Casablanca (1942)
(4.448) [114] Wallace & Gromit: The Best of Aardman Animation (1996)
(4.445) [64] Shawshank Redemption, The (1994)
(4.388) [603] Rear Window (1954)
(4.386) [12] Usual Suspects, The (1995)

So there you have it. I had been meaning to take a look at the GraphX API for a while, but somehow it never got very high on my list of priorities until now. I figure that this may be true for many people as well, they are comfortable working with RDDs and DataFrames and don't really see a need for something like GraphX until they need it, and then they figure they can get by with RDDs and DataFrames anyway. Hopefully, if you have been meaning to try it out, this gives you a sneak peek into how you can do so. For me, it was a fun exercise, so I thought I would write about it. Hopefully you found it interesting as well.


Saturday, August 11, 2018

Keyword deduplication using the Python dedupe library


I have been experimenting with keyword extraction techniques against the NIPS Papers dataset, consisting of titles, abstracts and full text of all papers from the Neural Information Processing Systems (NIPS) conference from 1987-2017, and contributed by Ben Hamner. The collection has 7239 papers written by 9785 authors. The reason I preferred this dataset to others such as Reuters or Medline is because it is smaller, and I can be both programmer and domain expert, and because I might learn interesting things while combing through the text of the papers looking for patterns to exploit.

In order to generate keywords, I used Ted Dunning's Log Likelihood Ratio (LLR) method (blog post, paper (PDF)) and the RAKE algorithm (described in Pattern Recognition and Machine Learning by Christopher Bishop, and adapted from code at aneesha/RAKE). I then selected the top scoring keywords from both methods, merged them to remove duplicate suggestions, went through them manually to remove what I considered spurious keywords (things like "et al", "better proposals", "usually set conservatively", etc), and then trained a MAUI model (following this blog post by Alyona Medelyan, creator of MAUI) using these keywords and the text. I then turned around and used the model to predict keywords against the same text (hoping to extract some more keywords that were missed in the training set). I then merged the predicted keywords with the training keywords.

The next step was to remove obviously duplicate keywords, such as singular and plural versions of the same thing ("absolute value", "absolute values"), words that were spelled differently but meant the same ("datasets", "data sets"), words that were close enough in meaning to appear as duplicates ("local linear", "locally linear"), etc. At this point, I had only about 2281 keywords, so I could have just done brute force matching (i.e., run 2281x2281 comparisons across all keywords), but I had recently heard about the Python Dedupe Library from Anne Moshyedi and Taylor Kramer, University of Virginia (UVA) Computer Science students interning at SWIFT Innovation Labs, who I was collaborating with around my open source project Solr Dictionary Annotator (SoDA) and wanted to try it out. They were using it as a step in their pipeline to resolve entities in structured data, which looks like the suggested use case for dedupe, going by the examples at their examples repository dedupeio/dedupe-examples. This post is about using dedupe against this set of keywords.

Since keywords are just text data, the first step was to convert it to some kind of structured data. For that I chose to generate 3-character shingles from the data, then run it through a feature hasher to hash these shingles down to a fixed length vector. Here is the code to do that.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import nltk
import numpy as np
import os

from sklearn.feature_extraction import FeatureHasher
from sklearn.metrics import jaccard_similarity_score

hasher = FeatureHasher(input_type="string", n_features=25, dtype=np.int32)
keywords = ["absolute value", "absolute values"]
hashes = []
for keyword in keywords:
    shingles = ["".join(trigram) for trigram 
        in nltk.trigrams([c for c in keyword])]
    keyword_hash = hasher.transform([shingles]).toarray()
    hashes.append(keyword_hash)
    print(keyword, keyword_hash)

print("jaccard:", jaccard_similarity_score(hashes[0][0], hashes[1][0]))

The output shows that we don't lose too much information after the feature hashing, the Jaccard similarity between the two fixed length (size 25) hashes for the two strings "absolute value" and "absolute values" is 0.96.

1
2
3
absolute value : [0, 0, 1, 0, 0, 0, 0, -1, -1, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, -2, -1, 0, 1, 0, 0]
absolute values: [0, 0, 1, 0, 0, 0, 0, -2, -1, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, -2, -1, 0, 1, 0, 0]
jaccard similarity: 0.96

We then apply this feature hashing procedure to all our keywords and write these hashes out to a CSV file along with the original keyword. Originally I added the keyword because there is an active learning component in the dedupe pipeline where the user is asked to identify if a pair of records are duplicates or not, and having the original keyword show up is the easiest way for the human trainer to identify duplicates. Later I realized that the keyword string is a useful feature by itself, because dedupe contains functionality to match against strings inside structured input as well.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DATA_DIR = "../data"

CURATED_KEYWORDS = os.path.join(DATA_DIR, "raw_keywords.txt")
CURATED_KEYWORD_HASHES = os.path.join(DATA_DIR, "curated_keywords_hash.csv")

fcurated = open(CURATED_KEYWORDS, "r")
fhashed = open(CURATED_KEYWORD_HASHES, "w")

# header
cols = ["id"]
cols.extend(["col_{:d}".format(i+1) for i in range(25)])
cols.append("keyword")
fhashed.write("{:s}\n".format(",".join(cols)))

# shingle each word into 3-char trigrams, then hash to 25 features
hasher = FeatureHasher(input_type="string", n_features=25, dtype=np.int32)
for rowid, keyword in enumerate(fcurated):
    keyword = keyword.strip()
    shingles = ["".join(trigram) for trigram 
        in nltk.trigrams([c for c in keyword])]
    keyword_hash = hasher.transform([shingles]).todense().tolist()[0]
    cols = [str(rowid)]
    cols.append(",".join([str(h) for h in keyword_hash]))
    cols.append(keyword)
    fhashed.write("{:s}\n".format(",".join(cols)))

fhashed.close()
fcurated.close()
print("num keywords: {:d}".format(rowid))

Next we take this set of deduped 2281 keywords, and run it through the dedupe pipeline. The code below is adapted from and mimics closely the CSV example on the dedupe-examples site. Dedupe is a machine learning pipeline that uses a combination of blocking (or grouping by fields), hierarchical clustering and logistic regression, along with active learning, to come up with its results.

The first step in the training is for the user to tell dedupe what fields to pay attention to during the deduping process. In our case, it is every element of the feature hasher output (represented by col_1 through col_25) plus the keyword string. Using this information, dedupe will identify potential duplicate pair candidates and ask the user for confirmation, a process it calls active labeling. During this phase, dedupe writes out a settings file (dedupe_keywords_learned_settings) and label file (dedupe_keywords_training.json). The code below checks for the presence of these files, and if found, it will skip the active labeling step (so for retraining, you should remove these files).

Once the active labeling, blocking and clustering is done, the predicted duplicate pairs can be retrieved from the dedupe classifier by supplying a threshold, essentially telling it how much more you value recall over precision. The last block of code below just retrieves these candidate pairs and outputs it into a tab separated format into the file keyword_dedup_mappings.tsv. I sort the keywords so that the longer one is first in the pair, that is to accomodate the next step in this pipeline which I will not go into in this post.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import csv
import os
import dedupe

DATA_DIR = "../data"
MODEL_DIR = "../models"

RAW_INPUT = os.path.join(DATA_DIR, "curated_keywords_hash.csv")
SETTINGS_FILE = os.path.join(MODEL_DIR, "dedupe_keywords_learned_settings")
TRAINING_FILE = os.path.join(MODEL_DIR, "dedupe_keywords_training.json")
OUTPUT_FILE = os.path.join(DATA_DIR, "keyword_dedupe_mappings.tsv")

# read training file (written using 09-keyword-dedupe) and transform
# into format suitable for use by dedupe
data = {}
with open(RAW_INPUT, "rb") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        row_id = int(row["id"])
        data[row_id] = dict(row.items())
        
# training
if os.path.exists(SETTINGS_FILE):
    print("reading from settings file {:s}".format(SETTINGS_FILE))
    with open(SETTINGS_FILE, "rb") as f:
        deduper = dedupe.StaticDedupe(f)
else:
    # define fields for deduper to pay attention to, here we
    # will ask for all fields except id
    field_names = ["col_{:d}".format(i+1) for i in range(25)]
    fields = [{"field": field_name, "type": "Exact"} 
              for field_name in field_names]
    fields.append({"field": "keyword", "type": "String"})
    deduper = dedupe.Dedupe(fields)
    # feed the deduper a sample of records for training
    deduper.sample(data, 15000)
    # use training data for previous run if available
    if os.path.exists(TRAINING_FILE):
        print("reading labeled examples from training file: {:s}"
              .format(TRAINING_FILE))
        with open(TRAINING_FILE, "rb") as f:
            deduper.readTraining(f)

    # active learning
    print("starting active labeling...")
    dedupe.consoleLabel(deduper)

    deduper.train()            
    
    # save training data to disk
    with open(TRAINING_FILE, "w") as f:
        deduper.writeTraining(f)
    with open(SETTINGS_FILE, "w") as f:
        deduper.writeSettings(f)
        
# blocking
print("blocking...")
threshold = deduper.threshold(data, recall_weight=1.5)

# clustering
print("clustering...")
clustered_dupes = deduper.match(data, threshold)

# write results
with open(OUTPUT_FILE, "w") as f:
    for cluster_id, cluster in enumerate(clustered_dupes):
        id_set, scores = cluster
        keywords = sorted([data[id]["keyword"] for id in id_set],
                          key=lambda x: len(x), reverse=True)
        f.write("{:s}\t{:s}\t{:.3f}\n".format(
            keywords[0], keywords[1], scores[0]))

The active learning part shows a pair of records on the console and asks the user to say yes or no to whether these records should be considered the same. User can exit the training loop at any point by choosing finish, or ignore a record as well by choosing unsure. This is done by the consoleLabel convenience method. The recommendation is to train 10 records, but I ended up training close to 40 because I wanted to give it an even split of positive and negative examples. Here is what a single query looks like.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
col_1 : 0
col_2 : 1
col_3 : 0
col_4 : 0
col_5 : 1
col_6 : 0
col_7 : -1
col_8 : 1
col_9 : 0
col_10 : 1
col_11 : 0
col_12 : 0
col_13 : 0
col_14 : 0
col_15 : 0
col_16 : 0
col_17 : 0
col_18 : 0
col_19 : 1
col_20 : -1
col_21 : 0
col_22 : 0
col_23 : 1
col_24 : 2
col_25 : 0
keyword : learning model

col_1 : 0
col_2 : 1
col_3 : 0
col_4 : 0
col_5 : 1
col_6 : 0
col_7 : -1
col_8 : 1
col_9 : 0
col_10 : 1
col_11 : 0
col_12 : 0
col_13 : 0
col_14 : 0
col_15 : 1
col_16 : 0
col_17 : 0
col_18 : 0
col_19 : 1
col_20 : -1
col_21 : 0
col_22 : 0
col_23 : 1
col_24 : 2
col_25 : 0
keyword : learning models

11/10 positive, 19/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished / (p)revious

Our final step is evaluating the quality of the predictions that dedupe gave us. Since I am looking for words which are almost duplicates but not quite, the edit distance would be a good way to measure such a thing. Since dedupe reports the confidence score when predicting duplicates, we will consider any pair as a duplicate when predicted with a confidence of over 0.75. Conversely, we will consider a pair real duplicates if the edit distance is upto 2 characters. We have codified this and use the metrics provided by Scikit-Learn to generate evaluation scores.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

i = 0
labels, preds = [], []
f = open(KEYWORD_DEDUPE_MAPPINGS, "r")
for line in f:
    keyword_left, keyword_right, score = line.strip().split("\t")
    score = float(score)
    preds.append(1 if score > 0.75 else 0)
    edit_dist = nltk.edit_distance(keyword_left, keyword_right)
    labels.append(1 if edit_dist <= 2 else 0)
    if i <= 10:
        print("{:25s}\t{:25s}\t{:.3f}\t{:.3f}".format(keyword_left, keyword_right, 
                                                      score, edit_dist))
    i += 1
f.close()

acc = accuracy_score(labels, preds)
cm = confusion_matrix(labels, preds)
cr = classification_report(labels, preds)

print("---")
print("accuracy: {:.3f}".format(acc))
print("---")
print("confusion matrix")
print(cm)
print("---")
print("classification report")
print(cr)

The output of this code block is shown below. The first block is the first 10 candidate pairs predicted by dedupe, along with the reported confidence and the computed edit distance. The accuracy of the deduper, given the thresholds, is 0.889 (almost 89%), and the precision, recall and f-score is 0.92, 0.89 and 0.90, all very good, especially given the effort spent in training dedupe.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
learning approaches       learning approach         0.776 2.000
absolute values           absolute value            0.796 1.000
dual variables            dual variable             0.878 1.000
synaptic weights          synaptic weight           0.816 1.000
performance measures      performance measure       0.818 1.000
synthetic dataset         synthetic data            0.684 3.000
dynamical systems         dynamical system          0.836 1.000
action pairs              action pair               0.877 1.000
action potentials         action potential          0.853 1.000
learning models           learning model            0.816 1.000
action spaces             action space              0.816 1.000

---
accuracy: 0.889

---
confusion matrix
[[ 52   5]
 [ 31 235]]

---
classification report
             precision    recall  f1-score   support

          0       0.63      0.91      0.74        57
          1       0.98      0.88      0.93       266

avg / total       0.92      0.89      0.90       323

One small caveat for those of you who are using Anaconda Python 3.x. I am, and I initially tried installing dedupe using pip, but it failed because of some incompatibility with the Anaconda libraries and dedupe-hcluster (a dependency for dedupe). I ended up installing using conda from the derickl repository on my Mac, but that downgraded my Python version from 3 to 2 and broke lots of other code, so I had to reinstall Anaconda (fortunately or unfortunately I have had to do this before and I have the process down, so I was back up and running, including all additional libraries except dedupe, within an afternoon). I ended up installing dedupe on my spare linux box on top of Anaconda Python 2.x using the conda from the riipl-org repository for the linux-64 version of dedupe. I believe that dedupe itself and its dependencies is Python 3 compatible now according to this issue, so hopefully at some point soon dedupe developers will push the latest version to conda-forge for both OSX and Linux.

That's all I have for today. As far as I can see, the dedupe library has been applied to structured data, I haven't seen it applied to unstructured data before, so I hope that the idea of shingling and feature hashing the shingles to convert unstructured to structured tabular data is a useful one for those of you with this use case. I cannot share the code for this at the moment since it is part of a bigger effort that I haven't put on a public repository yet. Once I publish the project, I will update this post with links.