Friday, July 20, 2012

Fun With Hadoop In Action Exercises (Java)

As some of you know, I recently took some online courses from Coursera. Having taken these courses, I have come to the realization that my knowledge has some rather large blind spots. So far, I have gotten most of my education from books and websites, and I have tended to cherry pick subjects which I need at the moment for my work, as a result of which I tend to ignore stuff (techniques, algorithms, etc) that fall outside that realm. Obviously, this is Not A Good Thing™, so I have begun to seek ways to remedy that.

I first looked at Hadoop years ago, but never got much beyond creating proof of concept Map-Reduce programs (Java and Streaming/Python) for text mining applications. Lately, many subprojects (Pig, Hive, etc) have come up in order to make it easier to deal with large amounts of data using Hadoop, about which I know nothing. So in an attempt to ramp up relatively quickly, I decided to take some courses at BigData University.

The course uses BigInsights (IBM's packaging of Hadoop) which run only on Linux. VMWare images are available, but since I have a Macbook Pro, that wasn't much use to me without a VMWare player (not free for Mac OSX). I then installed VirtualBox and tried to run a Fedora 10 64-bit image on it, and install BigInsights on Fedora, but it failed. I then tried to install Cloudera CDH4 (Cloudera's packaging of Hadoop) on it (its a series of yum commands), but that did not work out either. Ultimately I decided to ditch VirtualBox altogether and do a pseudo-distributed installation of the stock Apache Hadoop (1.0.3) direct on my Mac following instructions on Michael Noll's page.

The Hadoop Fundamentals I course which I was taking covers quite a few things, but I decided to stop and actually read all of Hadoop in Action (HIA) in order to get a more thorough coverage. I had purchased it some years before as part of Manning's MEAP (Early Access) program, so its a bit dated (examples are mostly in the older 0.19 API), but its the only Hadoop book I possess, and the concepts are explained beautifully, and its not a huge leap to mentally translate code from the old API to the new, so it was well worth the read.

I also decided to tackle the exercises (in Java for now) and post my solutions on GitHub. Three reasons. First, it exposes me to a more comprehensive set of scenarios than I have had previously, and forces me to use techniques and algorithms that I wont otherwise. Second, hopefully some of my readers can walk circles around me where Hadoop is concerned, and they would be kind enough to provide criticism and suggestions for improvement. And third, there may be some who would benefit from having the HIA examples worked out. So anyway, here they are, my solutions to selected exercises from Chapters 4 and 5 of the HIA book for your reading pleasure.

Top K Records

Using the cite75_99.txt of the Citation Dataset discussed in the HIA examples, the objective is to find the top N (== 10 in this case) most frequently cited patents in descending order.

The format of the input data is CITING_PATENT,CITED_PATENT. The solution (TopKRecords.java) consists of two Map-Reduce jobs described below.

In the first job, the mapper extracts the CITED_PATENT and sends the pair (CITED_PATENT, 1) to the reducer which aggregates it to (CITED_PATENT, n) where n is the number of times CITED_PATENT was cited. The record is then fed into a fixed size PriorityQueue with a custom Comparator that sorts the record by ascending count. As the Priority Queue size increases beyond N, records are discarded from the head of the queue (which is the lowest value). The cleanup method of the reducer writes out the record (CITED_PATENT, n) for the top N patents in the reducer to HDFS.

In the second job, the mapper reads the files written out by the previous job and writes out key value pairs as (CITED_PATENT, n) to a single reducer which now aggregates these into a fixed size Priority Queue to produce the top N cited patents across all the splits. Note that both the first and second jobs use the same Reducer implementation.

Web Traffic Measurement

Given a set of webserver logs for a site, the objective here is to find the hourly web traffic to that site. The best I could do for a relatively high traffic site, though, was the Solr/Tomcat logs from our staging servers, with a somewhat funky (multi-line) log format. Since Apache HTTPD logs are single-line, I decided to stick with the spirit of the problem and preprocess the Tomcat logs so they look like Apache access.log files. The script I used to do this is HourlyWebTraffic.py.

The solution is a single Map-Reduce job in HourlyWebTraffic.java. The mapper reads the logline and uses a regular expression to extract the date format from the log, parses it into a Date object using SimpleDateFormat, and extracts the hour from it. The mapper emits the (HOUR, 1) pair to the reducer, which sorts them. I got lazy here and made my number of reducers == 1 so the results are sorted and suitable for graphing, but I could just as well have used a custom partitioner (described in the Time Series example below) to ensure that the reducer outputs are sorted (or use an external sort on the merged files).

Sparse Matrix Dot Product

Given two matrices X and Y, the objective is to compute their Dot Product. The matrices are reshaped into column vectors and specified in sparse format, ie (col,value) only when value is non-zero. Each matrix is specified in its own data file. I used the DotProduct.py script to generate the two matrix files.

The solution is modeled as two Map-Reduce jobs in DotProduct.java. The mapper in the first job just parses the data into the key-value pair (col, value) pairs, and the reducer aggregates these values, thus calculating the products of X and Y column pairs. If there is only one (col, value) pair found in the reducer, then the product is 0. The reducer writes out (constant, product) values into HDFS.

The mapper in the next job reads the (constant, product) values from HDFS and the reducer sums them up. The sum is written out to HDFS.

Moving Average of a Time Series

Given a time series data (a list of numbers), the objective here is to calculate a Simple Moving Average over the last N data points. In addition, the exercise asks to use a Combiner to reduce data shuffling across the network, and a Partitioner to ensure that the averaged data remains sorted by sequence across multiple Reducers.

For the data, I used TimeSeries.py to generate a list of numbers for this. The solution (TimeSeries.java) is modeled as a single Map-Reduce job, described below:

In the first job, the mapper reads in the data from HDFS into a blocking queue. Whenever the size of the queue reaches N, all N readings are emitted to the reducer as (currentline, value). Thus, if N = 5, the mapper will emit (v1, v2, v3, v4, v5) as (5, v1), (5, v2), (5, v3), (5, v4) and (5, v5) and remove the last item from the queue. So from that point on, as the mapper reads each additional line, it will emit the last N values with the current line number as the key. The reducer sums these values and divides by N (the moving average) and writes it to HDFS.

The Combiner class here is the same as the Reducer class, and effectively computes the moving average before it is sent across the network to the reducer, so the reducer is just a pass through.

The Partitioner class splits up the key space into buckets. Thus each Reducer will deal with a subset of the range of keys and produce sorted lists of each subset as its output. When concatenated correctly, these sorted lists will also be sorted. The number of records, needed to compute the size of each partition, is passed in through the command line parameters and through the Configuration (our Partitioner implements Configurable).

Disjoint Selection (Reduce-side join)

This is a slight variation on the problem posed in the HIA book, since I wasn't able to get the data being referred to. Instead I used the MovieLens 1M dataset which has a similar parent-child structure.

With the MovieLens data, the objective is to find users who are not "power taggers", ie, those who have tagged less than MIN_RATINGS (25) movies. The solution (DisjointSelector.java) is a single Map-Reduce job using a technique called Repartitioned Join or Reduce-Side join.

The Mapper reads both the user data and ratings data. The number of colums differ in the two datasets, so it classifies the record as User or Rating and emits (userID, recordType) to the reducer. The Reducer groups the records by recordType and sums up the occurrences. If the number of rating records are less than MIN_RATINGS, then it writes out the (userID, numberOfRatings) to HDFS.

Ratio Calculation (Reduce-side join)

Once again, I changed the problem a bit because I could not get the data. The problem asks for the ratio between stock prices of the same stock from today and yesterday. What I could find was daily closing numbers for the last one year for any stock ticker symbol at http://www.google.com/finance/historical?output=csv&q=[Symbol name]. So I changed my example to calculate the ratio between Google and IBM stock prices over the last year. I am not sure what the results mean, or if they even mean anything at all.

So this is just another example of reduce-side joining. The code can be found in StockPriceRatio.java.

The Mapper parses out the date, symbol and closing price from the CSV file and emits the key-value pair (date, symbol:closing_price) to the reducer. The reducer parses out the symbol:closing_price and sets the closing_price value as the numerator if symbol == GOOG or the denominator if symbol == IBM. It then calculates the ratio and writes out (date, ratio) values to HDFS.

Product of Vector and Matrix (Map-side join + Distributed Cache)

The objective here is to build a Map-Reduce job to compute the product of a vector and matrix, both of which are sparse, and is represented similarly as the Sparse Matrix Dot Product example. The Matrix is identified by (row, col, value) and the vector is identified by (row, value) for non-zero values of value. Additionally, the vector should be held in Distributed Cache.

For this, we use another join technique called Map-side join shown in MatrixMultiplyVector.java. The data was generated using MatrixMultiplyVector.py.

Each Mapper reads the Vector file from HDFS Distributed Cache and populates an internal HashMap. It then reads each Matrix file entry, and multiplies the entry (if one exists) with the corresponding vector entry where vector row == matrix column. The Mapper emits (row, product) pairs to the Reducer, which sums up all the product values for the given row. The end result is a file representing the result vector in sparse format on HDFS.

Spatial Join (Semi-Join)

In this problem, given a 2-dimensional space where the x and y coordinates range from [-1 billion, +1 billion], and given a file of FOOs and BARs representing points on this space, the objective is to find FOOs which are less than 1 unit distance from a BAR. Distance is measured as Eucledian Distance. Also the number of BARs are << number of FOOs.

The idea here is to filter out join candidates in the Mapper and then join them in the Reducer, a technique called Semi-Join or Map-side filtering with Reduce-side Joining. The data for this was generated using SpatialJoin.py and the Java code for the Map-Reduce job can be seen in SpatialJoin.java.

This one had me racking my brains for a bit. I finally hit upon the idea of reducing a BAR to its cell (the 1x1 unit in which the point exists) plus all its immediate neighbor cells (a total of 9 cells). A FOO is reduced to only its containing cell. Now candidate neighbors for a FOO can be found by comparing its containing cell to one of the BAR cells. You still need to compute the actual Eucledian distance for each FOO-BAR pair, but this is much less than computing across all FOO-BAR pairs.

So the solution is basically a single Map-Reduce job. The Mapper reads data from either the FOO or the BAR file. If it finds a FOO, it just computes its 1x1 cell and emits (cell, original_value) to the Reducer. In case of BAR, it computes its 1x1 cell and its immediate neighbors and emits all 9 (cell, original_value) pairs to the Reducer. The Reducer aggregates the FOO and BAR into two Lists, then loops through the FOO and BAR points calculating the square of the distance between each pair. Pairs which are within 1 unit of each other are written out as (FOO-X, FOO-Y, BAR-X, BAR-Y) to HDFS.

Spatial Join with Bloom Filter (Semi-Join, Bloom Filter)

This is the same problem as the previous one, except that this time we have to use a Bloom Filter to hold the BARs. I used the Bloom Filter implementation that comes with Hadoop. This dynamic BloomFilter tutorial provided me good information about how to size the filter given the number of records I had.

The solution (SpatialJoinWithBloomFilter.java) consists of two Map-Reduce jobs. The first creates the BloomFilter in a distributed manner and persists it to HDFS. The Mapper reads the BAR file and writes the container cell and its neighbors for each BAR point into the Bloom Filter. The Reducer aggregates (ORs) the BloomFilters produced by the Mapper into a single one and writes it to HDFS in its cleanup() method.

The Mapper in the second job loads up the BloomFilter from HDFS in its setup() method, then reads the FOOs file. For each FOO, it will compute the container cell and check to see if there is a BAR with the same cell in the BloomFilter. If there is, it passes the FOO across. For the BARs, the Mapper will pass all the BARs to the Reducer. In both cases, the data is passed as (cell, original_value). The Reducer will get the group of FOO and BAR which was found to be close together, so it will group them similar to the previous solution, and compute the Eucledian distance between all candidate FOO-BAR pairs. Pairs within 1 unit of each other are written out to the HDFS.

So there you have it. None of these are hugely complicated and doesn't need a whole lot of domain knowledge, but it helped me explore and understand techniques that are standard for Hadoop coders. Hopefully it helps you too.

18 comments:

  1. What a nice effort and result Sujit.

    It's definitely helpful for others like me that's learning Hadoop to lessen the learning curve and see an avenue to practice.

    Hope to follow them one by one.

    For TopKRecords.java, why do you need to have 2 jobs again? Could it be done with only single job?

    First one just utilize parallel work of N mappers -> N reducers? How many again are the default mappers | reducers count?

    Second one just to parallel read... and reducer in here must only be one right?

    Nice use of min priority queue.

    ReplyDelete
  2. Thanks Sim. The second job is for sorting the output of the previous records, so you don't have to have it, you could just do a merge sort on the reducer outputs later. The default number of mappers and reducers on the first job are calculated by Hadoop based on the size of the input data.

    ReplyDelete
  3. Have read all your solutions Sujit. Now, it's time to work on Michael Noll's installation guide and reverse read HIA reference after seeing helpful and solid actual implementations. :-) I have encountered errors too before with IBM's and Cloudera's vms. Perhaps to return back playing more with Hadoop and give more time.

    By the way, you may find this summary of some patterns useful too http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/. Halfway now trying to absorb them.

    I recalled this one from before - a collection of Pig UDFs. http://engineering.linkedin.com/open-source/introducing-datafu-open-source-collection-useful-apache-pig-udfs

    Looking forward to more of your fun adventures and solid implementations sir!

    ReplyDelete
  4. Hi Sim, thanks for the links, they look very interesting.

    ReplyDelete
  5. Hi Christi, Hadoop provides a Java API so if you are going to use it as described in the post, then you will need to use Java. I believe you can use Hadoop from C with Hadoop streaming or Pipes project.

    ReplyDelete
  6. Sujit ... Java Code file link is not working... can you please upload java code again...

    ReplyDelete
  7. Thank you for pointing it out, its fixed now. Problem was I needed to make room for another project using Java/cascading-newsclip so now the old java stuff is in java/hadoop.

    ReplyDelete
  8. Thanks Rohini, it was worth it for me though...

    ReplyDelete
  9. Spatial Join with Bloom Filter (Semi-Join, Bloom Filter) How to run this program in a single cluster hadoop.?
    Could you please help me

    ReplyDelete
  10. Hi Girish, assuming you already have a single-node Hadoop cluster set up, you could just build the JAR file and copy both your data and JAR file to HDFS (using hadoop fs ...). You can then execute the JAR file (using hadoop job, specifying the path to the JAR file, input directory and output directory (all on HDFS, so you need to specify the path by hdfs://path/on/hdfs/to/jar, etc).

    ReplyDelete
  11. Thank you sir for your quick reply
    I have a single hadoop cluster
    Is it necessary to store the python scrip which is in the SpatialJoin.py

    also ,what type of data can be used for tested.??
    just simple docs or images?

    ReplyDelete
  12. The spatial_join.py just generates some dummy data for the job. You can run it locally to generate the data then upload the data into HDFS.

    ReplyDelete
  13. i got the random data

    my doubt now is that, i have creted jar file using the command jar cf java file name

    it created a folder named javafile saved it in hdfs
    but while running the program as
    ./bin/hadoop jar SpatialJoinWithBloomFilter.jar SpatialJoinWithBloomFilter test10/input test10/output

    where text 10/input is m input file
    test10/output is output foolder

    i got an error as
    Exception in thread "main" java.lang.ClassNotFoundException: SpatialJoinWithBloomFilter
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:153)

    ...

    i thnk the jar file cretion makes an error
    my jar file doesnt contain class file...i thnk some class file should be visible.....
    could u please help me how to execute?


    ReplyDelete
  14. Your "bin/hadoop jar" call is pointing to local (ie non HDFS) locations. You can do that too if you want, make sure you point to the full path to the jar file. If you run on local HDFS, prefix the HDFS path for the jar, input and output with hdfs://.

    ReplyDelete
  15. I guess there was misunderstanding in solution of Top K records from HiA book, at section 4.7, which says:
    "Top K records—Change AttributeMax.py (or AttributeMax.php) to output the entire record rather than only the maximumvalue. Rewrite it such that the MapReduce job outputs the records with the top K values rather than only the maximum."
    The input data set to be used is actually apat63_99.txt file, and the exercise asks for the records with the top K values (CLAIMS) rather than only the maximum. As AttributeMax.py described in listing 4.6 was giving records for maximum claims.
    Besides this, thanks for sharing.
    It will be also great to share the answers(the output records) for comparison.
    Thanks again.

    ReplyDelete
  16. Thanks Elkhan. Didn't know about the problem in the HIA book, thanks for sharing, but I just re-eyeballed my TopKRecords.java and it looks like it returns top 10 not just the maximum. Regarding sharing the data, I agree it makes the code easier to understand, I am trying to do this with posts going forward.

    ReplyDelete
  17. Thanks Niazi, and I was describing the algorithms as I was learning and implementing them, which is probably why you noticed the progression :-).

    ReplyDelete

Comments are moderated to prevent spam.