## Thursday, August 22, 2013

### Motivation

This post came about as a result of two events. First, I finished reading Paco Nathan's "Enterprise Data Workflows with Cascading" book (see my review on Amazon), and second, I started learning about the Enterprise Control Language (ECL) on the Lexis-Nexis High Performance Computing Cluster (HPCC). ECL is a bit like Pig which is a bit like Cascading, and one of the examples in the ECL tutorial was the Kevin Bacon Six Degrees of Separation problem. So I decided to try to build the example with Cascading, both as a way to get some experience with the Cascading API and as a comparison with the ECL solution.

The input to the problem is a set of (actor, movie) tuples from the IMDB database. This document (PDF) on the HPCC site contains links to FTP sites from which you can download the actors and actresses files, from which you can derive the (actor, movie) tuples using the following Java code.

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94``` ```// Source: src/main/java/com/mycompany/kevinbacon/load/Imdb2Csv.java package com.mycompany.kevinbacon.load; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.PrintWriter; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.util.StringUtils; public class Imdb2Csv { public static void main(String[] args) { try { Pattern actorPattern = Pattern.compile( "(.*?)\\s\\(.*?\\)"); Pattern moviePattern = Pattern.compile("(.*?)\\s\\(\\d{4}\\).*\$"); String[] inputs = new String[] { "data/landing/actors.list", "data/landing/actresses.list" }; PrintWriter output = new PrintWriter( new FileWriter("data/input/actor-movie.csv"), true); for (String input : inputs) { boolean header = true; boolean data = false; boolean footer = false; String actor = null; String movie = null; BufferedReader reader = new BufferedReader( new FileReader(new File(input))); String line = null; while ((line = reader.readLine()) != null) { // loop through lines until we hit this pattern // Name\tTitles // ----\t------- if (line.startsWith("----\t")) header = false; // skip the footer, it occurs after a long 40 dash // or so standalone line (pattern below works if // you are already in the data area). if (data && line.startsWith("--------------------")) footer = true; if (! header && ! footer) { data = true; if (line.trim().length() > 0 && ! line.startsWith("----\t")) { String[] cols = line.replaceAll("\t+", "\t").split("\t"); if (! line.startsWith("\t")) { Matcher ma = actorPattern.matcher(cols[0]); if (ma.matches()) actor = ma.group(1); Matcher mm = moviePattern.matcher(cols[1]); if (mm.matches()) movie = mm.group(1); } else { Matcher mm = moviePattern.matcher(cols[1]); if (mm.matches()) movie = mm.group(1); } // if line contains non-ascii chars, skip this line // the reasoning is that this is perhaps non-English // movie which we don't care about. if (isNonAscii(actor) || isNonAscii(movie)) continue; if (actor != null && movie != null) output.println(dequote(actor) + "\t" + dequote(movie)); } } } reader.close(); } output.flush(); output.close(); } catch (Exception e) { e.printStackTrace(); } } private static boolean isNonAscii(String s) { if (s == null) return true; char[] cs = s.toCharArray(); for (int i = 0; i < cs.length; i++) { if (cs[i] > 127) return true; } return false; } private static String dequote(String s) { String st = s.trim(); if (st.startsWith("\"") && st.endsWith("\"")) return st.substring(1, st.length()-1); else return st; } } ```

For this project, I decided to use Gradle, a relatively new (at least to me) build system that uses Groovy as the underlying language (as opposed to XML for Maven and Ant, or Scala for SBT). The decision was in part driven by the fact that Cascading devs use Gradle, and consequently their solutions for build level problems are also usually Gradle based, so I figured this would make my learning curve easier. In reality, I ended up spending quite some time wrestling with Gradle, but I think I now know enough about Gradle to get by. In any case, to run the above code using Gradle, you would need to put your IMDB files under data/landing and run the following command to get the (actor, movie) tuples file in data/inputs/actor-movie.csv.

 ```1 2``` ```sujit@cyclone:cascading-kevinbacon\$ gradle run \ -DmainClass=com.mycompany.kevinbacon.load.Imdb2Csv ```

Here is my Gradle build file, adapted from examples in the Cascading book, the Cascading for the Impatient series, and a lot of Googling. I can use this to build my Eclipse .classpath file, compile, run JUnit tests, and build a fat JAR that can be used both locally as well as on Amazon's Elastic Map Reduce (EMR) platform. It is included here as an example of a fully functioning build file (at least for my purposes).

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67``` ```// Source: build.gradle apply plugin: "java" apply plugin: "idea" apply plugin: "eclipse" apply plugin: "application" mainClassName = System.getProperty("mainClass") archivesBaseName = "kevinbacon" repositories { mavenLocal() mavenCentral() mavenRepo name: "conjars", url: "http://conjars.org/repo/" } configurations { provided compile.extendsFrom provided } ext.cascadingVer = "2.1.6" dependencies { compile(group: "cascading", name: "cascading-core", version: cascadingVer) compile(group: "cascading", name: "cascading-local", version: cascadingVer) compile(group: "cascading", name: "cascading-hadoop", version: cascadingVer) provided(group: "org.apache.hadoop", name: "hadoop-core", version: "1.0.3") testCompile(group: "cascading", name: "cascading-platform", version: cascadingVer) testCompile("org.apache.hadoop:hadoop-test:1.0.3") testCompile("cascading:cascading-test:2.0.8") testCompile("junit:junit:4.8.+") testCompile("org.slf4j:slf4j-api:1.7.2") testCompile("commons-io:commons-io:2.1") testRuntime("org.slf4j:slf4j-log4j12:1.7.2") testRuntime("log4j:log4j:1.2.16") } test { testLogging.showStandardStreams = true beforeTest { descriptor -> logger.lifecycle("Running test: " + descriptor) } onOutput { descriptor, event -> logger.lifecycle("Test " + descriptor + " produced error: " + event.message) } } jar { description = "Assembles a JAR file" from { (configurations.runtime - configurations.provided).collect { it.isDirectory() ? it : zipTree(it) } } { exclude "META-INF/*.SF" exclude "META-INF/*.DSA" exclude "META-INF/*.RSA" } manifest { attributes("Main-Class": "com.mycompany.kevinbacon.flow.Main") } } ```

The DOT file generated by the Cascading flow planner is shown on left. Essentially, the job consists of 7 iterations - at each iteration, the input is the (actor, movie) tuple collection and the Kevin Bacon costars from the previous degree of separation. For example, at the first iteration, we are looking for direct costars of Kevin Bacon (0 degrees of separation), so our costars are the tuple containing Kevin Bacon. We join against the (actor, movie) tuple to find all movies where Kevin Bacon worked in, then join the (actor, movie) tuple set against the (movie) tuple set to find the list of Kevin Bacon costars. We also annotate each costar with the current degree of separation. In the next iteration, this set of costar tuples are used to find the costars of the costars, and so on. Finally, the costar tuples from all 7 iterations are merged and grouped so the minimum degree of separation is stored against each costar. We then group this tuple on the "Bacon number" to find a count of costars at each degree of separation.

The code below represents the Cascading flow depicted in the diagrams above. It translates to 18 Hadoop MapReduce jobs.

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123``` ```// Source: src/main/java/com/mycompany/kevinbacon/flow/Main.java package com.mycompany.kevinbacon.flow; import java.util.Properties; import cascading.flow.Flow; import cascading.flow.FlowConnector; import cascading.flow.FlowDef; import cascading.flow.hadoop.HadoopFlowConnector; import cascading.operation.Filter; import cascading.operation.Identity; import cascading.operation.aggregator.Min; import cascading.operation.expression.ExpressionFilter; import cascading.operation.filter.Not; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.pipe.Merge; import cascading.pipe.Pipe; import cascading.pipe.assembly.CountBy; import cascading.pipe.assembly.Rename; import cascading.pipe.assembly.Retain; import cascading.pipe.assembly.Unique; import cascading.property.AppProps; import cascading.scheme.hadoop.TextDelimited; import cascading.tap.Tap; import cascading.tap.hadoop.Hfs; import cascading.tuple.Fields; public class Main { public static void main(String[] args) { String input = args[0]; String detailOutput = args[1]; String summaryOutput = args[2]; Tap tin = new Hfs(new TextDelimited( Constants.inputFields, false, false, "\t"), input); Tap toutDetail = new Hfs(new TextDelimited( Constants.detailFields, false, false, "\t"), detailOutput); Tap toutSummary = new Hfs(new TextDelimited( Constants.summaryFields, false, false, "\t"), summaryOutput); Pipe allPairs = new Pipe("allPairs"); // create a pipe with only Kevin Bacon Pipe kevinBacon = new Pipe("kevinBacon", allPairs); Filter kevinBaconFilter = new ExpressionFilter( "! actor.equals(\"Bacon, Kevin\")", String.class); kevinBacon = new Each(kevinBacon, kevinBaconFilter); kevinBacon = new Retain(kevinBacon, Constants.actorField); kevinBacon = new Unique(kevinBacon, Constants.actorField); // At each degree of separation, find the costars of // actors in the actor pipe (second arg to FindCostars) // by joining on actor to find movies, then joining on // movie to find costars. Pipe kevinBaconCostars0 = new FindCostars( allPairs, kevinBacon, 0); Pipe kevinBaconCostars1 = new FindCostars( allPairs, kevinBaconCostars0, 1); Pipe kevinBaconCostars2 = new FindCostars( allPairs, kevinBaconCostars1, 2); Pipe kevinBaconCostars3 = new FindCostars( allPairs, kevinBaconCostars2, 3); Pipe kevinBaconCostars4 = new FindCostars( allPairs, kevinBaconCostars3, 4); Pipe kevinBaconCostars5 = new FindCostars( allPairs, kevinBaconCostars4, 5); Pipe kevinBaconCostars6 = new FindCostars( allPairs, kevinBaconCostars5, 6); // merge pipes together, then filter out Kevin Bacon, // group by actors and choose the minimum Bacon number // for each actor, and finally rename the min column to // count. Pipe merged = new Merge("merged", Pipe.pipes( kevinBaconCostars0, kevinBaconCostars1, kevinBaconCostars3, kevinBaconCostars4, kevinBaconCostars5, kevinBaconCostars6)); merged = new Each(merged, new Not(kevinBaconFilter)); merged = new GroupBy(merged, Constants.actorField); merged = new Every(merged, Constants.kbnumField, new Min()); merged = new Rename(merged, new Fields("min"), Constants.kbnumField); // split the merged pipe into detail and summary pipes. // This is needed to avoid "duplicate pipe" errors from // Cascading when trying to set two tail sinks. The // merged pipe already contains the information needed // for the detail pipe, and the summary pipe needs a bit // more processing, detailed in the comment block below. Pipe details = new Pipe("details", merged); details = new Each(details, new Identity()); // generate summary stats - retain only the bacon number // column and group by it and count the number of costars // in each bacon number group. Pipe summary = new Pipe("summary", merged); summary = new Retain(summary, Constants.kbnumField); summary = new CountBy(summary, Constants.kbnumField, Constants.countField); FlowDef fd = FlowDef.flowDef(). addSource(allPairs, tin). addTailSink(details, toutDetail). addTailSink(summary, toutSummary); Properties props = new Properties(); AppProps.setApplicationJarClass(props, Main.class); FlowConnector fc = new HadoopFlowConnector(props); Flow flow = fc.connect(fd); flow.writeDOT("data/kevinbacon.dot"); flow.writeStepsDOT("data/kevinbacon-steps.dot"); flow.complete(); } } ```

We have broken off the work to find costars at each degree of separation into a sub-assembly FindCostars, which is also shown below:

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62``` ```// Source: src/main/java/com/mycompany/kevinbacon/flow/FindCostars.java package com.mycompany.kevinbacon.flow; import cascading.operation.Insert; import cascading.pipe.Each; import cascading.pipe.HashJoin; import cascading.pipe.Pipe; import cascading.pipe.SubAssembly; import cascading.pipe.assembly.Retain; import cascading.pipe.assembly.Unique; import cascading.pipe.joiner.InnerJoin; import cascading.tuple.Fields; /** * This subassembly returns a pipe containing costars at * the next degree of separation. Following functionality * is implemented. * * (1) Join with original pipe of (actor, movie) tuples and * the pipe containing actors found in previous step * to find all movies acted in by the actors. * (2) Dedup the movies pipe. * (3) Join with original pipe of (actor, movie) tuples and * the movies pipe to find all costars of the actors. * (4) Dedup the actors pipe. * (5) Add a new column with the current Kevin Bacon number * (degree of separation). */ public class FindCostars extends SubAssembly { private static final long serialVersionUID = 3450219986636439710L; private Fields movieResultFields = new Fields("actor", "movie", "actor1"); private Fields actorResultFields = new Fields("actor", "movie", "movie1"); public FindCostars(Pipe allPairs, Pipe actors, int kbNumber) { // join with original pipe on actor to produce pipe of // all movies acted on by the actors in pipe actor actors = new Retain(actors, Constants.actorField); Pipe movies = new HashJoin( allPairs, Constants.actorField, actors, Constants.actorField, movieResultFields, new InnerJoin()); movies = new Retain(movies, Constants.movieField); movies = new Unique(movies, Constants.movieField); // now find all the actors for these movies, these // will be the costars for the incoming actors in // actorPipe. Finally insert the Bacon number for // costars at this degree of separation. Pipe costars = new HashJoin( allPairs, Constants.movieField, movies, Constants.movieField, actorResultFields, new InnerJoin()); costars = new Retain(costars, Constants.actorField); costars = new Unique(costars, Constants.actorField); Insert insfun = new Insert(Constants.kbnumField, kbNumber); costars = new Each(costars, insfun, Constants.detailFields); setTails(costars); } } ```

### Unit Testing

The Cascading user guide recommends that all sub-assemblies, flows, operations, etc should be unit tested. Unit tests should extend the PlatformTestCase class. Tuples participating in test cases are populated via test files. Here is the JUnit test for the FindCostars subassembly. In addition, since I was unfamiliar with some operations in the Cascading API, I built another test case to help me experiment with various strategies (not included here, its on the GitHub site).

 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59``` ```// Source: src/test/java/com/mycompany/kevinbacon/flow/FindCostarsTest.java package com.mycompany.kevinbacon.flow; import java.io.File; import org.apache.commons.io.FileUtils; import org.junit.Before; import org.junit.Test; import cascading.PlatformTestCase; import cascading.flow.Flow; import cascading.flow.FlowDef; import cascading.pipe.Pipe; import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.test.LocalPlatform; import cascading.test.PlatformRunner.Platform; @Platform(LocalPlatform.class) public class FindCostarsTest extends PlatformTestCase { private static final long serialVersionUID = 8950872097793074273L; @Before public void setup() throws Exception { File output = new File("src/test/data/output"); FileUtils.deleteDirectory(output); } @Test public void testFindCostars() throws Exception { String allPairsFilename = "src/test/data/find_costars_allpairs.csv"; String actorsFilename = "src/test/data/find_costars_actors.csv"; String costarFilename = "src/test/data/output/costars.csv"; getPlatform().copyFromLocal(allPairsFilename); getPlatform().copyFromLocal(actorsFilename); Tap tapAllPairs = getPlatform().getDelimitedFile( Constants.inputFields, "\t", allPairsFilename, SinkMode.KEEP); Tap tapActors = getPlatform().getDelimitedFile( Constants.detailFields, "\t", actorsFilename, SinkMode.KEEP); Tap tapCostars = getPlatform().getDelimitedFile( Constants.detailFields, "\t", costarFilename, SinkMode.REPLACE); Pipe allPairs = new Pipe("allPairs"); Pipe actors = new Pipe("actors"); Pipe costars = new FindCostars(allPairs, actors, 2); FlowDef flowDef = FlowDef.flowDef(). addSource(allPairs, tapAllPairs). addSource(actors, tapActors). addTailSink(costars, tapCostars); Flow flow = getPlatform().getFlowConnector().connect(flowDef); flow.complete(); validateLength(flow, 7); } } ```

To run it, we can use the gradle test task, like below:

 ```1 2``` ```sujit@cyclone:cascading-kevinbacon\$ gradle test \ -Dtest.single=path.to.test.class ```

### Running on Amazon EMR

Before running on Amazon EMR, I made sure that the job ran correctly (with a tiny input file I hand-built) against my local Hadoop instance.

 ```1 2 3 4 5 6 7``` ```sujit@cyclone:cascading-kevinbacon\$ gradle clean jar sujit@cyclone:cascading-kevinbacon\$ rm -rf data/output sujit@cyclone:cascading-kevinbacon\$ /opt/hadoop-1.2.1/bin/hadoop \ jar build/libs/kevinbacon.jar \ src/test/data/test.csv \ data/output/detail \ data/output/summary ```

I then used the EMR browser interface to start the job. The input file and fat JAR first needed to be uploaded into S3, then a new job flow had to be created in EMR - the prompts are pretty easy to understand. My input file had 9438301 (actor, movie) tuples. It took 58 minutes on a 1+4 node m1.medium cluster to produce the following summary:

 ```1 2 3``` ```0 43405 1 208829 3 635 ```

### Conclusion

Building this project was a lot of fun. I ended up learning enough about the Cascading API to be reasonably confident about being able to implement real-world applications in future. Also, the Cascading code clearly came out ahead in terms of readability compared to the ECL example code - although that may be at least partly because I am more proficient in Java than ECL.

In any case, hope you found this interesting as well. If you would like to replicate this or improve upon it, you can find the code on my GitHub page for this project.