Saturday, February 20, 2016

Trip Report - ElasticON 2016 @ San Francisco


I attended the ElasticSearch User Conference (elasticON 2016) at San Francisco last week. As you know, I am more familiar with Solr, so while I have worked with ElasticSearch (ES) a few times in the past, it hasn't stuck because of two reasons. One, its just easier to go with what I know, and two, because I didn't feel that ES brought anything significantly different to the table to justify choosing it over Solr. Maybe it was because of a vague notion that people prefer ES for analytics and Solr for search, and at the time, I was doing mostly search. My current job is more analytics focused, and we use ES as a substrate for our Spark jobs when we need to be able to query the results. So the conference was quite useful for me.

My main takeaway from the conference is that the power of ES comes from the synergy between its components, the so-called ELKB stack (ES, Logstash, Kibana and Beats), now rebranded the Elastic Stack. Using these, it is remarkably easy to set up ES as a document store, populate it with whatever data you want (many options are available via the appropriate Beat), and set up beautiful and powerful visualizations against it with Kibana, all without writing a line of code and without really having to understand anything about search (except that you want it). This is really powerful, and is probably one of the reasons for its popularity in the analytics space. In terms of search alone, ES and Solr are busy playing catch-up with each other, so its probably a tie in that space.

The rest of the post is my take on the events and talks I attended (and liked). I hope you find it interesting. The organizers have promised to make the videos public after the conference, so I will update the post with the links to these talks once that happens. Also I found that Elastic has a dedicated page on SpeakerDeck which might be another good source of information.

EDIT (03-Mar-2016) - The full set of presentation videos from ElasticON 2016 are now available! Looking forward to watching some of the ones I missed because I had to decide between two equally interesting talks.

Day One (Wednesday)


I had signed up for a half-day training titled "From Zero to Log Hero". The training itself was quite basic but it is designed to give students a quick hands-on look at the Elastic Stack. Elastic provides a cloud platform via its Elastic Cloud product, and we used that to spin up an instance of the Elastic Stack (ES + Kibana), generate data off our laptops using their TopBeat product (collects some data from the Unix top command) and send it off to ES via Logstash. Before the training, the only component in the Elastic Stack I knew about was ES, so I found the training really useful. Thanks to the training, I now understand what my colleague meant when he mentioned saving Spark results to ES and using Kibana to visualize the results.

After the training, the conference opened with the keynote from various members of Elastic (the company). The talks had a certain rock concert quality to them, not as strange as it sounds, since the speakers were rock stars within the ES community (Shay Bannon, Uri Boness, Rashid Khan and Mark Harwood, to name just a few). The talk centered around the future of the company, announcing the rebranding of ELKB to the Elastic Stack, the unification of release versions across the product line, several new products such as the Graph API, Timelion for time series analysis and visualization, and several successful user stories such as the Mayo Clinic point-of-care app.

The Ghost in the Search Machine - presented by Doug Turnbull, search relevance expert and one of the authors of Relevant Search. He presented a way to detect similar images using ES by representing pixels as words in the image and treating the image as a text document made up of these image words. He also briefly referred to using More-Like-This to detect at risk students by representing them as a document of student attributes. I have been looking at image search myself recently, and had come across this Bag of Visual Words approach during that, so it was good to see someone present this approach.

Web Content Analytics at Scale with Parse.ly - presented by Andrew Montalenty, CTO of Parse.ly. Parse.ly provides digital publishers with audience insights based on tracking their behavior. He spoke about the challenges of dealing with such a high volume of data coming at such high speeds (volume, velocity). One of the solutions was to partition the incoming data by various features (for example, by date, by customer for large customers, etc). Another solution that I liked was rolling up data by different intervals (raw, 5 minutes, 1 hour) to provide the input for time series and other analytics requested by customers at different levels of granularity.

Contributing to ES - how to get started - I confess that my motivation in attending this talk was more self-serving (need to understand how to navigate the code if needed) than altruistic (desire to contribute code back to the project). Nik Everett recounts his experience as an ex-contributor and now committer with a few tips to potential contributors. One of the top ones is to look for TransportXXXAction (where XXX is the action you want, such as Index, Query, etc). There were some other tips, but this one seemed the most useful.

Day Two (Thursday)


Eventbrite's Search based approach to Recommendations - talk by John Berryman, co-author of the Relevant Search book. He started by explaining how Recommendations can be thought of as an extension to Search, and described 4 different ways to include Recommendation information into your ES queries. He then went on to show how Search is ultimately subsumed by Recommendations, and is just a special case of a larger Recommendation objective. My takeaway is that they are both intimately related, and pure content based search is going to depend more and more on recommendation approaches as it moves towards being personalized.

Data Hero: Legends of Analytics @ Activision - presented by Josh Hermann and Will Kirwin, engineers at Activision. Activision provides a platform for multiplayer video games on the Internet (this is my understanding based on the talks and works well enough to describe the applications that follow, apologies if this is incorrect. In my defense, I am completely clueless about video games). In any case, they described two systems based on ES, Pathos and Name Findernator. The Activision corpus consists of tweets made by customers and contains very unique entity names, meaningful only in the context of video gaming or a particular game. Pathos analyzes these tweets and filters out Quality of Service tweets (site is down, slow response, etc) for handling by customer service, and does sentiment analysis to detect sentiment around these entities for the remaining tweets. Name Findernator is used to search for user tagnames - users can get quite imaginative with their tag names (such as c0wnt_z3r0 vs count_zero), so the application uses a custom analysis chain to normalize tagnames so they can be searched.

Graph Capabilities in the Elastic Stack - talk by Mark Harwood about upcoming graph features in ES and what they can be used for. He discusses the concept of uncommonly common and how it can be used to remove features that don't provide much signal. My understanding is that it is a bit like Inverse Document Frequency - just like a term loses its discriminative power if it found in many documents in a corpus, so too does an attribute that is true for many members, such as liking the Beatles in a population of rock music listeners. The ES Graph search API will use a metric around this idea to build a better More-Like-This. Also, ES graphs are not graphs in the traditional sense - an edge in a ES graph represents a common attribute or attribute value shared by the vertices connecting it. Finally, the Kibana visualization comes standard with the new graph API.

Whats happening in Hadoop and Spark - talk by Costin Leau, author of the elasticsearch-hadoop project. The project provides an API to ES from a wide variety of big-data environments so you can integrate ES into your big-data workflow. The environments include Apache Spark (which is why I attended). I did not get any new insights from the talk, but I would argue that it is a good thing, since it indicates a simple and easy to use API where the complexity is effectively hidden away. The main concerns revolve around partitioning granularity mismatch between ES and the Spark RDDs - if your ES partitions are too large for your RDD, then you will get an Out-of-Memory. There was some more discussions around this at Costin's presentation at the Advanced Spark meetup on Monday last week.

Day Three (Friday)


All about Elasticsearch Algorithms and Data Structures - talk by Zachary Tong and Colin Goodheart-Smithe where they talk about four algorithms/data structures that are used inside ES. Roaring bitmaps use a two level tree data structure to chunk filters so they are more space-efficient. They do this by chunking a filter into 2GB blocks, then using a heuristic to determine if they are sparse or dense. Sparse blocks are represented using document ID lists, and dense blocks are represented by bitmaps. This approach allows ES to use the most space efficient representation without sacrificing performance. The second was to use Simulated Annealing to automatically find the approximate best parameters for ES's Moving Average Aggregation algorithm. The last two algorithms were the T-Digest and the HDRHistogram algorithms to find approximate percentiles in streaming data.

Geospatial Data Structures in ElasticSearch and Apache Lucene - talk by Nick Knize about how to use Geospatial data structures, how they actually work behind the scenes (hierarchically breaking up into smaller structures to capture edges), the upcoming K-D tree in Lucene 6 and how it will be used for GeoSearch. I wonder if the KD tree implementation in Lucene would allow for larger dimensions than 2 or 3 - if it did, it could be used to partition points into clusters, so you can avoid having to do O(n^2) pairwise comparisons across the whole corpus. Maybe something to look forward to from Lucene 6.

Bringing Healthcare to the Point-of-Care @ Mayo Clinic - talk by Dr Peter Li, project lead for this project at Mayo Clinic that provides results of collaborative filtering using patient data (lab results) to doctors. The application plots the current patient's data (for each lab result) in a percentile chart that is computed from patients similar to the current patient (after aligning the records around the event of interest). Allows the doctor to see how "normal" (or otherwise) his patient is relative to others in the cohort. Doctor can use the IPad interface to select smaller cohorts that he believes are more representative of his patiend and the system recomputes the percentile charts with smaller data. I thought the application was interesting because rather than attempt to provide an answer or a recommendation, it goes halfway and produces the information in a manner that the doctor can consume and make decisions off of.

Dude, Where are my messages? Message analytics @ Netflix - talk by Netflix engineers Devika Chawla and George Abraham, describing their use of ElasticSearch in their messaging infrastructure to find the status of a message at any given time. To appreciate how cool this is, you have to consider the size of their cluster - this is a global system spread across 3 AWS availability zones with multiple machine clusters for each processor in their pipeline. Events from different producers flow through the pipeline and get converted into different kinds of messages (email, text, etc) destined for different consumers. They implemented message tracking by building a Log4j appender which writes out a message to an ES cluster at each stage of processing, so they can then go in and query the ES cluster to detect the current state of the message event.

Conclusion


Overall, I thought the talks were of very high quality and aimed squarely at their developer community. Elastic is obviously a company that is very heavily invested in and justifiably proud of their product. I have mentioned previously my impression that they seem to be more popular with the analytics community - from what I observe, Elastic is focusing more heavily in this area as well. Their stated decision to turn on DocValues by default in Elastic Stack version 5 speaks to their primary use case to treat ES more as a column/document store than a search engine.

Shorter term, the conference gave me a quick introduction to the Elastic Stack which I hope to leverage at work. Looking forward to learning more about and doing more with the Elastic Stack!

EDIT (11-Mar-2016): I viewed two more presentations that I missed during the actual conference and that were of interest to me, so I wanted to spotlight them.

New/Upcoming features of Lucene 6 - talk by Adrien Grand. Covers multiple new features such as K-D trees (8 dimensional allowed through API, but code can support up to 256 dimensions), coming in Lucene 6. K-D trees provide an additional data structure to the inverted index to speed up search on structured data that can be represented as a feature vector. He also covers the move to using BM25 similarity, less mathy than Britta Weber's presentation (below); some changes to the query optimizer using ratios of document frequency (DF) and total term frequency (TTF) to order the sequence in which subqueries are run; and some smaller but important performance enhancements that will make Lucene faster.

Improved Text Scoring with BM25 - by Brita Webber, discusses the scoring formula for the Lucene BM25 algorithm, which is going to become the new default similarity implementation starting from Lucene 5. She also breaks down the components for both the existing TF-IDF similarity scoring formula and the one for BM25 and discusses in detail how they differ, and how this is likely to result in better results (or at least different) results for text queries starting with Lucene 5.

Sunday, February 07, 2016

Counting Triangles in a Movie Actor Network


I recently came across an algorithm to list all triangles in a graph in the Data Algorithms book. I didn't have a real application to use it against, but given that counting triangles have quite a few uses, I figured it would be useful to try and implement in Spark. So thats what this post is all about.

The algorithm is somewhat non-trivial, so I will use the very simple graph shown below to explain the algorithm step-by-step. You can instantly see that the two triangles in this graph are (2, 3, 4) and (2, 4, 5). Here is the Spark code that implements the algorithm.


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def addReverseEdge(pair: (Long, Long)): List[(Long, Long)] = {
  val reverse = (pair._2, pair._1)
  List(pair, reverse)
}

def buildEdges(u: Long, vs: List[Long]): List[((Long, Long), Long)] = {
  val uvpairs = vs.map(v => ((u, v), -1L))
  val vpairs = for (v1 <- vs; v2 <- vs) yield ((v1, v2), u)
  uvpairs ++ vpairs.filter(vv => vv._1._1 < vv._1._2)
}

def buildTriangles(uv: (Long, Long), ws: List[Long]): List[((Long, Long, Long), Int)] = {
  val hasAnchor = ws.filter(_ < 0).size > 0
  val onlyAnchor = ws.size == 1 && hasAnchor
  if (hasAnchor && !onlyAnchor) {
    ws.filter(w => w > 0)
      .map(w => {
        val nodes = List(uv._1, uv._2, w).sorted
        ((nodes(0), nodes(1), nodes(2)), 1)
      })
  } else List.empty
}

// val actorPairs = sc.parallelize(List(
//   (1L, 2L), (2L, 3L), (2L, 4L), (2L, 5L), (3L, 4L), (4L, 5L)))

val actorPairs = sc.textFile("/path/to/actor_pairs.tsv")
  .map(line => line.split('\t'))
  .map(cols => (cols(0).toLong, cols(1).toLong))
  .cache

val triangles = actorPairs.flatMap(kv => addReverseEdge(kv))      // (u, v) += (v, u)
  .map(kv => (kv._1, List(kv._2)))                                // (u, [v])
  .reduceByKey((a, b) => a ++ b)                                  // (u, [v1, v2, ...])
  .flatMap(kvs => buildEdges(kvs._1, kvs._2))                     // ((u, v), w)
  .mapValues(w => List(w))                                        // ((u, v), [w])
  .reduceByKey((a, b) => a ++ b)                                  // ((u, v), [w1, w2, ...])
  .flatMap(uvws => buildTriangles(uvws._1, uvws._2))              // ((u, v, w), 1)
  .reduceByKey((a, b) => a + b)                                   // ((u, v, w), count) - dedup triangles
  .cache

Input to the algorithm is a list of edges, each edge being represented as a pair of vertex IDs. We ensure that each edge is represented exactly once by ensuring that the source vertex ID is less than the target vertex ID. For our graph, the input looks like this:

1
(1, 2), (2, 3), (2, 4), (2, 5), (3, 4), (4, 5)

The first flatMap call on actorPairs adds a reverse edge, ie, for every (u, v) edge we add an additional (v, u) edge. We do this because we will use the LHS vertex to group on, considering each vertex as the start vertex for our triangles. Once we are done, our data looks like this:

1
(1, [2]), (2, [3, 1, 4, 5]), (3, [2, 4]), (4, [3, 2, 5]), (5, [2, 4])

The next three lines build up edges with our left vertex and each vertex on the list on the right hand element of each pair, then group the third vertex by the edge. The result of this operation looks like this:

1
2
3
((4, 2), [-1]), ((3, 4), [-1, 2]), ((1, 4), [2]), ((2, 3), [-1, 4]), 
((5, 4), [-1]), ((1, 2), [-1]), ((5, 2), [-1]), ((3, 5), [2, 4]), 
((2, 5), [-1, 4]), ((1, 3), [2])

The next line groups by all three edges and removes any invalid vertex triples. Since a triangle can be created in one of 3 ways using a set of three vertices, we want to dedup this by sorting the vertex IDs. The step returns the sorted vertex set and a 1, and the next line counts the vertex set representing the triangle.

1
((2, 3, 4), 3), ((2, 4, 5), 3)

For my data, I used the list of movies, actors and actresses, available as plain text files from the IMDB Dataset. The dataset contains 3.6 million movies and TV shows, 2.2 million actors and 1.2 million actresses. The formats are as follows:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# movies.list
# movie-name \t release-year
Star Wars VII: Return of the Empire (2013)  2013
...

# actors.list
# actor-name \t movie-name [role]
# \t \t movie-name [role]
Carl, Shay              Bro-Friend (2012)  [Brother]
                        Star Wars VII: Return of the Empire (2013)  [Obi Wan]
...

# actresses.list
# actress-name \t movie-name [role]
# \t \t movie-name [role]
Katz, Liz               2 Big 2 Be True 8 (2007) (V)  [(as Risi Simms)]
                        Star Wars VII: Return of the Empire (2013)  [Princess Leia]
...

In order to get this data into the vertex-pair format we need, we used the following Python code to do the conversion. Since we were only interested in movies, we skipped all TV shows listed (names beginning with quotes). We first created actor-movie pairs (out of both actor and actress files), then grouped on movies to find all actors that worked together in the movie, and then built edges out of these actors.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# preprocess.py
# -*- coding: utf-8 -*-
import re

movie_pattern = re.compile("(.*?\(\d{4}.*\))\s.*$")

def after_year(cutoff, year):
    if year == "????":
        return False
    elif int(year) >= cutoff:
        return True
    else:
        return False

def get_movie_name(full_movie_name):
    split_at = full_movie_name.find(')')
    movie_name = ""
    if split_at > -1:
        movie_name = full_movie_name[0:split_at + 1]
    return movie_name

def build_movie_dict():
    movies = open("../../../data/movies.list", 'rb')
    movie_dict = {}
    line_num = 0
    for line in movies:
        line = line.strip()
        if line.startswith("\""):
            continue
        line = unicode(line, "iso-8859-1").encode("utf-8", "ignore")
        movie_name_col, year = re.split("\t+", line)
        if after_year(1900, year):
            movie_name = get_movie_name(movie_name_col)
            line_num += 1
            movie_dict[movie_name] = line_num
    movies.close()
    return movie_dict

def build_actor_dict():
    top_1000 = open("../../../data/actor_names.tsv", 'rb')
    top_actors = {}
    for line in top_1000:
        actor_name, actor_id = line.strip().split('\t')
        top_actors[actor_name] = int(actor_id)
    top_1000.close()
    return top_actors
    
def write_actor_movie_pair(actor_fin, pair_fout, movie_dict):
    actor_dict = build_actor_dict()
    line_num = 0
    for line in actor_fin:
        line = line.rstrip()
        if len(line) == 0:
            continue
        line = unicode(line, "iso-8859-1").encode("utf-8", "ignore")
        if line[0] == '\t':
            movie_name = get_movie_name(line.strip())
        else:
            # extract the actor name
            actor_name, actor_rest = re.split("\t+", line)
            movie_name = get_movie_name(actor_rest)
        line_num += 1
        if movie_dict.has_key(movie_name) and actor_dict.has_key(actor_name):
            pair_fout.write("%d\t%d\n" % 
                (actor_dict[actor_name], movie_dict[movie_name]))

def group_by_movie(fin, fout):
    movie_actor = {}
    for line in fin:
        actor_id, movie_id = line.strip().split('\t')
        if movie_actor.has_key(movie_id):
            movie_actor[movie_id].append(actor_id)
        else:
            movie_actor[movie_id] = [actor_id]
    actor_pairs = set()
    for movie_id in movie_actor.keys():
        actors = movie_actor[movie_id]
        for a in actors:
            for b in actors:
                if int(a) < int(b):
                    abkey = ":".join([a, b])
                    if abkey in actor_pairs:
                        continue
                    fout.write("%s\t%s\n" % (a, b))
    fout.close()
    fin.close()
                    
    
###################### main #######################

movie_dict = build_movie_dict()
fout = open("../../../data/actor_movie_pairs.tsv", 'wb')
actors = open("../../../data/actors.list", 'rb')
write_actor_movie_pair(actors, fout, movie_dict)
actors.close()
actresses = open("../../../data/actresses.list", 'rb')
write_actor_movie_pair(actresses, fout, movie_dict)
actresses.close()
fout.close()

actorid_movieid = open("../../../data/actor_movie_pairs.tsv", 'rb')
actorid_actorid = open("../../../data/actor_pairs.tsv", 'wb')
group_by_movie(actorid_movieid, actorid_actorid)
actorid_movieid.close()
actorid_actorid.close()

The resulting data had almost 136 million edges. I tried running the algorithm several times on a cluster of 3 m3.xlarge machines on AWS with slight changes, but wasn't successful. Changes included reducing the scope of the problem in different ways by first only considering actors with large number of edges, then only considering movies made after a certain date. Finally, I realized that IMDB lists a lot of actors for each movie, some of whom don't even make the credits in the actual movie. So I used the list of Top 1,000 Actors and Actresses by Hagen Nelson, and only considered movies that one of these 1,000 people acted in. This gave me a dataset of 197k edges, which is what I used for my analysis.

I then broadcast-join the vertices of the triangles to the list of actor names to form triples of actor names.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
val actorNames = sc.textFile("/path/to/actor_names.tsv")
  .map(line => {
    val Array(aname, aid) = line.split('\t')
    (aid.toLong, aname)
  })
  .collectAsMap
val bActorNames = sc.broadcast(actorNames)

val actorNamesInTriangles = triangles.map(uvw => {
  val uname = bActorNames.value.getOrElse(uvw._1._1, "UNK")
  val vname = bActorNames.value.getOrElse(uvw._1._2, "UNK")
  val wname = bActorNames.value.getOrElse(uvw._1._3, "UNK")
  (uname, vname, wname)  
})

actorNamesInTriangles.take(50)
  .foreach(uvw => println("%s\t%s\t%s".format(uvw._1, uvw._2, uvw._3)))

Here are some triangles in the graph that the algorithm returns. As you can see, there are quite a few familiar names (if you are familiar with Hollywood actors).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Nolte, Nick Carell, Steve Krasinski, John
Hoffman, Dustin Page, Geraldine Gish, Lillian
Hathaway, Anne Fraser, Brendan Culkin, Macaulay
Linney, Laura Tomei, Marisa Sutherland, Kiefer
Watts, Naomi Lohan, Lindsay Sewell, Rufus
Fonda, Jane Liotta, Ray Hewitt, Jennifer Love
Cruise, Tom Clooney, George Broadbent, Jim
McConaughey, Matthew Baruchel, Jay Fallon, Jimmy
Reeves, Keanu Bassett, Angela Turturro, John
Bale, Christian Baldwin, Alec Dujardin, Jean
...

Turns out that Spark's GraphX library also has routines for triangle counting, although it does not list triangles, it just returns a count of triangles rooted at each vertex. However, it is more performant by an order of magnitude, so its worth knowing as well. Here is a code snippet to compute the Clustering Coefficient of the vertices and the full graph.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import org.apache.spark.graphx._

val g = GraphLoader.edgeListFile(sc, "/path/to/actor_pairs.tsv", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
val triangleCounts = g.triangleCount.vertices
val degrees = g.degrees
val clusterCoeffs = triangleCounts.join(degrees) // join on vertex id
  .map(vcd => (vcd._1, 2.0 * vcd._2._1 / vcd._2._2))
  .map(vc => (bActorNames.value.getOrElse(vc._1, "UNK"), vc._2))
  .sortBy(_._2, false)
  .take(10)
  .foreach(nc => println("%5.3f\t%s".format(nc._2, nc._1)))

Gives you a list of actors with their clustering coefficients.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
104.487 Perabo, Piper
103.545 Liotta, Ray
100.418 Page, Ellen
99.110 Russo, Rene
99.073 Steenburgen, Mary
98.211 McTeer, Janet
97.639 Arkin, Alan
97.552 Strathairn, David
96.876 Paquin, Anna
96.051 Crudup, Billy
...

To compute the graph clustering coefficient, we run the following code.

1
2
3
4
5
6
7
8
9
val numTriangles = triangleCounts.map(vc => ("G", vc._2))
  .reduceByKey((a, b) => a + b)
  .map(v => v._2)
  .take(1)(0) / 3.0
val numTriads = degrees.map(vd => ("G", vd._2))
  .reduceByKey((a, b) => a + b)
  .map(v => v._2)
  .take(1)(0) / 2.0
val graphClusterCoeff = numTriangles / numTriads

Returns a graph clustering coefficient of 22.02.