I recently came across an algorithm to list all triangles in a graph in the Data Algorithms book. I didn't have a real application to use it against, but given that counting triangles have quite a few uses, I figured it would be useful to try and implement in Spark. So thats what this post is all about.
The algorithm is somewhat non-trivial, so I will use the very simple graph shown below to explain the algorithm step-by-step. You can instantly see that the two triangles in this graph are (2, 3, 4) and (2, 4, 5). Here is the Spark code that implements the algorithm.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | def addReverseEdge(pair: (Long, Long)): List[(Long, Long)] = {
val reverse = (pair._2, pair._1)
List(pair, reverse)
}
def buildEdges(u: Long, vs: List[Long]): List[((Long, Long), Long)] = {
val uvpairs = vs.map(v => ((u, v), -1L))
val vpairs = for (v1 <- vs; v2 <- vs) yield ((v1, v2), u)
uvpairs ++ vpairs.filter(vv => vv._1._1 < vv._1._2)
}
def buildTriangles(uv: (Long, Long), ws: List[Long]): List[((Long, Long, Long), Int)] = {
val hasAnchor = ws.filter(_ < 0).size > 0
val onlyAnchor = ws.size == 1 && hasAnchor
if (hasAnchor && !onlyAnchor) {
ws.filter(w => w > 0)
.map(w => {
val nodes = List(uv._1, uv._2, w).sorted
((nodes(0), nodes(1), nodes(2)), 1)
})
} else List.empty
}
// val actorPairs = sc.parallelize(List(
// (1L, 2L), (2L, 3L), (2L, 4L), (2L, 5L), (3L, 4L), (4L, 5L)))
val actorPairs = sc.textFile("/path/to/actor_pairs.tsv")
.map(line => line.split('\t'))
.map(cols => (cols(0).toLong, cols(1).toLong))
.cache
val triangles = actorPairs.flatMap(kv => addReverseEdge(kv)) // (u, v) += (v, u)
.map(kv => (kv._1, List(kv._2))) // (u, [v])
.reduceByKey((a, b) => a ++ b) // (u, [v1, v2, ...])
.flatMap(kvs => buildEdges(kvs._1, kvs._2)) // ((u, v), w)
.mapValues(w => List(w)) // ((u, v), [w])
.reduceByKey((a, b) => a ++ b) // ((u, v), [w1, w2, ...])
.flatMap(uvws => buildTriangles(uvws._1, uvws._2)) // ((u, v, w), 1)
.reduceByKey((a, b) => a + b) // ((u, v, w), count) - dedup triangles
.cache
|
Input to the algorithm is a list of edges, each edge being represented as a pair of vertex IDs. We ensure that each edge is represented exactly once by ensuring that the source vertex ID is less than the target vertex ID. For our graph, the input looks like this:
1 | (1, 2), (2, 3), (2, 4), (2, 5), (3, 4), (4, 5)
|
The first flatMap call on actorPairs adds a reverse edge, ie, for every (u, v) edge we add an additional (v, u) edge. We do this because we will use the LHS vertex to group on, considering each vertex as the start vertex for our triangles. Once we are done, our data looks like this:
1 | (1, [2]), (2, [3, 1, 4, 5]), (3, [2, 4]), (4, [3, 2, 5]), (5, [2, 4])
|
The next three lines build up edges with our left vertex and each vertex on the list on the right hand element of each pair, then group the third vertex by the edge. The result of this operation looks like this:
1 2 3 | ((4, 2), [-1]), ((3, 4), [-1, 2]), ((1, 4), [2]), ((2, 3), [-1, 4]),
((5, 4), [-1]), ((1, 2), [-1]), ((5, 2), [-1]), ((3, 5), [2, 4]),
((2, 5), [-1, 4]), ((1, 3), [2])
|
The next line groups by all three edges and removes any invalid vertex triples. Since a triangle can be created in one of 3 ways using a set of three vertices, we want to dedup this by sorting the vertex IDs. The step returns the sorted vertex set and a 1, and the next line counts the vertex set representing the triangle.
1 | ((2, 3, 4), 3), ((2, 4, 5), 3)
|
For my data, I used the list of movies, actors and actresses, available as plain text files from the IMDB Dataset. The dataset contains 3.6 million movies and TV shows, 2.2 million actors and 1.2 million actresses. The formats are as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | # movies.list
# movie-name \t release-year
Star Wars VII: Return of the Empire (2013) 2013
...
# actors.list
# actor-name \t movie-name [role]
# \t \t movie-name [role]
Carl, Shay Bro-Friend (2012) [Brother]
Star Wars VII: Return of the Empire (2013) [Obi Wan]
...
# actresses.list
# actress-name \t movie-name [role]
# \t \t movie-name [role]
Katz, Liz 2 Big 2 Be True 8 (2007) (V) [(as Risi Simms)]
Star Wars VII: Return of the Empire (2013) [Princess Leia]
...
|
In order to get this data into the vertex-pair format we need, we used the following Python code to do the conversion. Since we were only interested in movies, we skipped all TV shows listed (names beginning with quotes). We first created actor-movie pairs (out of both actor and actress files), then grouped on movies to find all actors that worked together in the movie, and then built edges out of these actors.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | # preprocess.py
# -*- coding: utf-8 -*-
import re
movie_pattern = re.compile("(.*?\(\d{4}.*\))\s.*$")
def after_year(cutoff, year):
if year == "????":
return False
elif int(year) >= cutoff:
return True
else:
return False
def get_movie_name(full_movie_name):
split_at = full_movie_name.find(')')
movie_name = ""
if split_at > -1:
movie_name = full_movie_name[0:split_at + 1]
return movie_name
def build_movie_dict():
movies = open("../../../data/movies.list", 'rb')
movie_dict = {}
line_num = 0
for line in movies:
line = line.strip()
if line.startswith("\""):
continue
line = unicode(line, "iso-8859-1").encode("utf-8", "ignore")
movie_name_col, year = re.split("\t+", line)
if after_year(1900, year):
movie_name = get_movie_name(movie_name_col)
line_num += 1
movie_dict[movie_name] = line_num
movies.close()
return movie_dict
def build_actor_dict():
top_1000 = open("../../../data/actor_names.tsv", 'rb')
top_actors = {}
for line in top_1000:
actor_name, actor_id = line.strip().split('\t')
top_actors[actor_name] = int(actor_id)
top_1000.close()
return top_actors
def write_actor_movie_pair(actor_fin, pair_fout, movie_dict):
actor_dict = build_actor_dict()
line_num = 0
for line in actor_fin:
line = line.rstrip()
if len(line) == 0:
continue
line = unicode(line, "iso-8859-1").encode("utf-8", "ignore")
if line[0] == '\t':
movie_name = get_movie_name(line.strip())
else:
# extract the actor name
actor_name, actor_rest = re.split("\t+", line)
movie_name = get_movie_name(actor_rest)
line_num += 1
if movie_dict.has_key(movie_name) and actor_dict.has_key(actor_name):
pair_fout.write("%d\t%d\n" %
(actor_dict[actor_name], movie_dict[movie_name]))
def group_by_movie(fin, fout):
movie_actor = {}
for line in fin:
actor_id, movie_id = line.strip().split('\t')
if movie_actor.has_key(movie_id):
movie_actor[movie_id].append(actor_id)
else:
movie_actor[movie_id] = [actor_id]
actor_pairs = set()
for movie_id in movie_actor.keys():
actors = movie_actor[movie_id]
for a in actors:
for b in actors:
if int(a) < int(b):
abkey = ":".join([a, b])
if abkey in actor_pairs:
continue
fout.write("%s\t%s\n" % (a, b))
fout.close()
fin.close()
###################### main #######################
movie_dict = build_movie_dict()
fout = open("../../../data/actor_movie_pairs.tsv", 'wb')
actors = open("../../../data/actors.list", 'rb')
write_actor_movie_pair(actors, fout, movie_dict)
actors.close()
actresses = open("../../../data/actresses.list", 'rb')
write_actor_movie_pair(actresses, fout, movie_dict)
actresses.close()
fout.close()
actorid_movieid = open("../../../data/actor_movie_pairs.tsv", 'rb')
actorid_actorid = open("../../../data/actor_pairs.tsv", 'wb')
group_by_movie(actorid_movieid, actorid_actorid)
actorid_movieid.close()
actorid_actorid.close()
|
The resulting data had almost 136 million edges. I tried running the algorithm several times on a cluster of 3 m3.xlarge machines on AWS with slight changes, but wasn't successful. Changes included reducing the scope of the problem in different ways by first only considering actors with large number of edges, then only considering movies made after a certain date. Finally, I realized that IMDB lists a lot of actors for each movie, some of whom don't even make the credits in the actual movie. So I used the list of Top 1,000 Actors and Actresses by Hagen Nelson, and only considered movies that one of these 1,000 people acted in. This gave me a dataset of 197k edges, which is what I used for my analysis.
I then broadcast-join the vertices of the triangles to the list of actor names to form triples of actor names.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | val actorNames = sc.textFile("/path/to/actor_names.tsv")
.map(line => {
val Array(aname, aid) = line.split('\t')
(aid.toLong, aname)
})
.collectAsMap
val bActorNames = sc.broadcast(actorNames)
val actorNamesInTriangles = triangles.map(uvw => {
val uname = bActorNames.value.getOrElse(uvw._1._1, "UNK")
val vname = bActorNames.value.getOrElse(uvw._1._2, "UNK")
val wname = bActorNames.value.getOrElse(uvw._1._3, "UNK")
(uname, vname, wname)
})
actorNamesInTriangles.take(50)
.foreach(uvw => println("%s\t%s\t%s".format(uvw._1, uvw._2, uvw._3)))
|
Here are some triangles in the graph that the algorithm returns. As you can see, there are quite a few familiar names (if you are familiar with Hollywood actors).
1 2 3 4 5 6 7 8 9 10 11 | Nolte, Nick Carell, Steve Krasinski, John
Hoffman, Dustin Page, Geraldine Gish, Lillian
Hathaway, Anne Fraser, Brendan Culkin, Macaulay
Linney, Laura Tomei, Marisa Sutherland, Kiefer
Watts, Naomi Lohan, Lindsay Sewell, Rufus
Fonda, Jane Liotta, Ray Hewitt, Jennifer Love
Cruise, Tom Clooney, George Broadbent, Jim
McConaughey, Matthew Baruchel, Jay Fallon, Jimmy
Reeves, Keanu Bassett, Angela Turturro, John
Bale, Christian Baldwin, Alec Dujardin, Jean
...
|
Turns out that Spark's GraphX library also has routines for triangle counting, although it does not list triangles, it just returns a count of triangles rooted at each vertex. However, it is more performant by an order of magnitude, so its worth knowing as well. Here is a code snippet to compute the Clustering Coefficient of the vertices and the full graph.
1 2 3 4 5 6 7 8 9 10 11 12 | import org.apache.spark.graphx._
val g = GraphLoader.edgeListFile(sc, "/path/to/actor_pairs.tsv", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
val triangleCounts = g.triangleCount.vertices
val degrees = g.degrees
val clusterCoeffs = triangleCounts.join(degrees) // join on vertex id
.map(vcd => (vcd._1, 2.0 * vcd._2._1 / vcd._2._2))
.map(vc => (bActorNames.value.getOrElse(vc._1, "UNK"), vc._2))
.sortBy(_._2, false)
.take(10)
.foreach(nc => println("%5.3f\t%s".format(nc._2, nc._1)))
|
Gives you a list of actors with their clustering coefficients.
1 2 3 4 5 6 7 8 9 10 11 | 104.487 Perabo, Piper
103.545 Liotta, Ray
100.418 Page, Ellen
99.110 Russo, Rene
99.073 Steenburgen, Mary
98.211 McTeer, Janet
97.639 Arkin, Alan
97.552 Strathairn, David
96.876 Paquin, Anna
96.051 Crudup, Billy
...
|
To compute the graph clustering coefficient, we run the following code.
1 2 3 4 5 6 7 8 9 | val numTriangles = triangleCounts.map(vc => ("G", vc._2))
.reduceByKey((a, b) => a + b)
.map(v => v._2)
.take(1)(0) / 3.0
val numTriads = degrees.map(vd => ("G", vd._2))
.reduceByKey((a, b) => a + b)
.map(v => v._2)
.take(1)(0) / 2.0
val graphClusterCoeff = numTriangles / numTriads
|
Returns a graph clustering coefficient of 22.02.
Today I am happy to see your new update. This is the longest period you didn't update the blog since last few years. It looks likes a gift to me for Chinese new year today. thanks haha :)
ReplyDeleteThank you Xiangtao. Hope to post more frequently going forward, and a very Happy Chinese New Year to you!
ReplyDelete