Sunday, February 07, 2016

Counting Triangles in a Movie Actor Network

I recently came across an algorithm to list all triangles in a graph in the Data Algorithms book. I didn't have a real application to use it against, but given that counting triangles have quite a few uses, I figured it would be useful to try and implement in Spark. So thats what this post is all about.

The algorithm is somewhat non-trivial, so I will use the very simple graph shown below to explain the algorithm step-by-step. You can instantly see that the two triangles in this graph are (2, 3, 4) and (2, 4, 5). Here is the Spark code that implements the algorithm.

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40``` ```def addReverseEdge(pair: (Long, Long)): List[(Long, Long)] = { val reverse = (pair._2, pair._1) List(pair, reverse) } def buildEdges(u: Long, vs: List[Long]): List[((Long, Long), Long)] = { val uvpairs = vs.map(v => ((u, v), -1L)) val vpairs = for (v1 <- vs; v2 <- vs) yield ((v1, v2), u) uvpairs ++ vpairs.filter(vv => vv._1._1 < vv._1._2) } def buildTriangles(uv: (Long, Long), ws: List[Long]): List[((Long, Long, Long), Int)] = { val hasAnchor = ws.filter(_ < 0).size > 0 val onlyAnchor = ws.size == 1 && hasAnchor if (hasAnchor && !onlyAnchor) { ws.filter(w => w > 0) .map(w => { val nodes = List(uv._1, uv._2, w).sorted ((nodes(0), nodes(1), nodes(2)), 1) }) } else List.empty } // val actorPairs = sc.parallelize(List( // (1L, 2L), (2L, 3L), (2L, 4L), (2L, 5L), (3L, 4L), (4L, 5L))) val actorPairs = sc.textFile("/path/to/actor_pairs.tsv") .map(line => line.split('\t')) .map(cols => (cols(0).toLong, cols(1).toLong)) .cache val triangles = actorPairs.flatMap(kv => addReverseEdge(kv)) // (u, v) += (v, u) .map(kv => (kv._1, List(kv._2))) // (u, [v]) .reduceByKey((a, b) => a ++ b) // (u, [v1, v2, ...]) .flatMap(kvs => buildEdges(kvs._1, kvs._2)) // ((u, v), w) .mapValues(w => List(w)) // ((u, v), [w]) .reduceByKey((a, b) => a ++ b) // ((u, v), [w1, w2, ...]) .flatMap(uvws => buildTriangles(uvws._1, uvws._2)) // ((u, v, w), 1) .reduceByKey((a, b) => a + b) // ((u, v, w), count) - dedup triangles .cache ```

Input to the algorithm is a list of edges, each edge being represented as a pair of vertex IDs. We ensure that each edge is represented exactly once by ensuring that the source vertex ID is less than the target vertex ID. For our graph, the input looks like this:

 `1` ```(1, 2), (2, 3), (2, 4), (2, 5), (3, 4), (4, 5) ```

The first flatMap call on actorPairs adds a reverse edge, ie, for every (u, v) edge we add an additional (v, u) edge. We do this because we will use the LHS vertex to group on, considering each vertex as the start vertex for our triangles. Once we are done, our data looks like this:

 `1` ```(1, [2]), (2, [3, 1, 4, 5]), (3, [2, 4]), (4, [3, 2, 5]), (5, [2, 4]) ```

The next three lines build up edges with our left vertex and each vertex on the list on the right hand element of each pair, then group the third vertex by the edge. The result of this operation looks like this:

 ```1 2 3``` ```((4, 2), [-1]), ((3, 4), [-1, 2]), ((1, 4), [2]), ((2, 3), [-1, 4]), ((5, 4), [-1]), ((1, 2), [-1]), ((5, 2), [-1]), ((3, 5), [2, 4]), ((2, 5), [-1, 4]), ((1, 3), [2]) ```

The next line groups by all three edges and removes any invalid vertex triples. Since a triangle can be created in one of 3 ways using a set of three vertices, we want to dedup this by sorting the vertex IDs. The step returns the sorted vertex set and a 1, and the next line counts the vertex set representing the triangle.

 `1` ```((2, 3, 4), 3), ((2, 4, 5), 3) ```

For my data, I used the list of movies, actors and actresses, available as plain text files from the IMDB Dataset. The dataset contains 3.6 million movies and TV shows, 2.2 million actors and 1.2 million actresses. The formats are as follows:

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18``` ```# movies.list # movie-name \t release-year Star Wars VII: Return of the Empire (2013) 2013 ... # actors.list # actor-name \t movie-name [role] # \t \t movie-name [role] Carl, Shay Bro-Friend (2012) [Brother] Star Wars VII: Return of the Empire (2013) [Obi Wan] ... # actresses.list # actress-name \t movie-name [role] # \t \t movie-name [role] Katz, Liz 2 Big 2 Be True 8 (2007) (V) [(as Risi Simms)] Star Wars VII: Return of the Empire (2013) [Princess Leia] ... ```

In order to get this data into the vertex-pair format we need, we used the following Python code to do the conversion. Since we were only interested in movies, we skipped all TV shows listed (names beginning with quotes). We first created actor-movie pairs (out of both actor and actress files), then grouped on movies to find all actors that worked together in the movie, and then built edges out of these actors.

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105``` ```# preprocess.py # -*- coding: utf-8 -*- import re movie_pattern = re.compile("(.*?\(\d{4}.*\))\s.*\$") def after_year(cutoff, year): if year == "????": return False elif int(year) >= cutoff: return True else: return False def get_movie_name(full_movie_name): split_at = full_movie_name.find(')') movie_name = "" if split_at > -1: movie_name = full_movie_name[0:split_at + 1] return movie_name def build_movie_dict(): movies = open("../../../data/movies.list", 'rb') movie_dict = {} line_num = 0 for line in movies: line = line.strip() if line.startswith("\""): continue line = unicode(line, "iso-8859-1").encode("utf-8", "ignore") movie_name_col, year = re.split("\t+", line) if after_year(1900, year): movie_name = get_movie_name(movie_name_col) line_num += 1 movie_dict[movie_name] = line_num movies.close() return movie_dict def build_actor_dict(): top_1000 = open("../../../data/actor_names.tsv", 'rb') top_actors = {} for line in top_1000: actor_name, actor_id = line.strip().split('\t') top_actors[actor_name] = int(actor_id) top_1000.close() return top_actors def write_actor_movie_pair(actor_fin, pair_fout, movie_dict): actor_dict = build_actor_dict() line_num = 0 for line in actor_fin: line = line.rstrip() if len(line) == 0: continue line = unicode(line, "iso-8859-1").encode("utf-8", "ignore") if line[0] == '\t': movie_name = get_movie_name(line.strip()) else: # extract the actor name actor_name, actor_rest = re.split("\t+", line) movie_name = get_movie_name(actor_rest) line_num += 1 if movie_dict.has_key(movie_name) and actor_dict.has_key(actor_name): pair_fout.write("%d\t%d\n" % (actor_dict[actor_name], movie_dict[movie_name])) def group_by_movie(fin, fout): movie_actor = {} for line in fin: actor_id, movie_id = line.strip().split('\t') if movie_actor.has_key(movie_id): movie_actor[movie_id].append(actor_id) else: movie_actor[movie_id] = [actor_id] actor_pairs = set() for movie_id in movie_actor.keys(): actors = movie_actor[movie_id] for a in actors: for b in actors: if int(a) < int(b): abkey = ":".join([a, b]) if abkey in actor_pairs: continue fout.write("%s\t%s\n" % (a, b)) fout.close() fin.close() ###################### main ####################### movie_dict = build_movie_dict() fout = open("../../../data/actor_movie_pairs.tsv", 'wb') actors = open("../../../data/actors.list", 'rb') write_actor_movie_pair(actors, fout, movie_dict) actors.close() actresses = open("../../../data/actresses.list", 'rb') write_actor_movie_pair(actresses, fout, movie_dict) actresses.close() fout.close() actorid_movieid = open("../../../data/actor_movie_pairs.tsv", 'rb') actorid_actorid = open("../../../data/actor_pairs.tsv", 'wb') group_by_movie(actorid_movieid, actorid_actorid) actorid_movieid.close() actorid_actorid.close() ```

The resulting data had almost 136 million edges. I tried running the algorithm several times on a cluster of 3 m3.xlarge machines on AWS with slight changes, but wasn't successful. Changes included reducing the scope of the problem in different ways by first only considering actors with large number of edges, then only considering movies made after a certain date. Finally, I realized that IMDB lists a lot of actors for each movie, some of whom don't even make the credits in the actual movie. So I used the list of Top 1,000 Actors and Actresses by Hagen Nelson, and only considered movies that one of these 1,000 people acted in. This gave me a dataset of 197k edges, which is what I used for my analysis.

I then broadcast-join the vertices of the triangles to the list of actor names to form triples of actor names.

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17``` ```val actorNames = sc.textFile("/path/to/actor_names.tsv") .map(line => { val Array(aname, aid) = line.split('\t') (aid.toLong, aname) }) .collectAsMap val bActorNames = sc.broadcast(actorNames) val actorNamesInTriangles = triangles.map(uvw => { val uname = bActorNames.value.getOrElse(uvw._1._1, "UNK") val vname = bActorNames.value.getOrElse(uvw._1._2, "UNK") val wname = bActorNames.value.getOrElse(uvw._1._3, "UNK") (uname, vname, wname) }) actorNamesInTriangles.take(50) .foreach(uvw => println("%s\t%s\t%s".format(uvw._1, uvw._2, uvw._3))) ```

Here are some triangles in the graph that the algorithm returns. As you can see, there are quite a few familiar names (if you are familiar with Hollywood actors).

 ```1 2 3 4 5 6 7 8 9 10 11``` ```Nolte, Nick Carell, Steve Krasinski, John Hoffman, Dustin Page, Geraldine Gish, Lillian Hathaway, Anne Fraser, Brendan Culkin, Macaulay Linney, Laura Tomei, Marisa Sutherland, Kiefer Watts, Naomi Lohan, Lindsay Sewell, Rufus Fonda, Jane Liotta, Ray Hewitt, Jennifer Love Cruise, Tom Clooney, George Broadbent, Jim McConaughey, Matthew Baruchel, Jay Fallon, Jimmy Reeves, Keanu Bassett, Angela Turturro, John Bale, Christian Baldwin, Alec Dujardin, Jean ... ```

Turns out that Spark's GraphX library also has routines for triangle counting, although it does not list triangles, it just returns a count of triangles rooted at each vertex. However, it is more performant by an order of magnitude, so its worth knowing as well. Here is a code snippet to compute the Clustering Coefficient of the vertices and the full graph.

 ```1 2 3 4 5 6 7 8 9 10 11 12``` ```import org.apache.spark.graphx._ val g = GraphLoader.edgeListFile(sc, "/path/to/actor_pairs.tsv", true) .partitionBy(PartitionStrategy.RandomVertexCut) val triangleCounts = g.triangleCount.vertices val degrees = g.degrees val clusterCoeffs = triangleCounts.join(degrees) // join on vertex id .map(vcd => (vcd._1, 2.0 * vcd._2._1 / vcd._2._2)) .map(vc => (bActorNames.value.getOrElse(vc._1, "UNK"), vc._2)) .sortBy(_._2, false) .take(10) .foreach(nc => println("%5.3f\t%s".format(nc._2, nc._1))) ```

Gives you a list of actors with their clustering coefficients.

 ```1 2 3 4 5 6 7 8 9 10 11``` ```104.487 Perabo, Piper 103.545 Liotta, Ray 100.418 Page, Ellen 99.110 Russo, Rene 99.073 Steenburgen, Mary 98.211 McTeer, Janet 97.639 Arkin, Alan 97.552 Strathairn, David 96.876 Paquin, Anna 96.051 Crudup, Billy ... ```

To compute the graph clustering coefficient, we run the following code.

 ```1 2 3 4 5 6 7 8 9``` ```val numTriangles = triangleCounts.map(vc => ("G", vc._2)) .reduceByKey((a, b) => a + b) .map(v => v._2) .take(1)(0) / 3.0 val numTriads = degrees.map(vd => ("G", vd._2)) .reduceByKey((a, b) => a + b) .map(v => v._2) .take(1)(0) / 2.0 val graphClusterCoeff = numTriangles / numTriads ```

Returns a graph clustering coefficient of 22.02.

2 comments (moderated to prevent spam):

Xiangtao Wang said...

Today I am happy to see your new update. This is the longest period you didn't update the blog since last few years. It looks likes a gift to me for Chinese new year today. thanks haha :)

Sujit Pal said...

Thank you Xiangtao. Hope to post more frequently going forward, and a very Happy Chinese New Year to you!