Wednesday, September 11, 2013

Search Rules using Mahout's Association Rule Mining


This work came about based on a conversation with one of our domain experts, who was relaying a conversation he had with one of our clients. The client was looking for ways to expand the query based on terms already in the query - for example, if a query contained "cattle" and "neurological disorder", then we should also server results for "bovine spongiform encephalopathy", also known as "mad cow disease".

We do semantic search, which involves annotating words and phrases in documents with concepts from our taxonomy. One view of an annotated document is the bag of concepts view, where a document is modeled as a sparsely populated array of scores, each position corresponding to a concept. One way to address the client's requirement would be to do Association Rule Mining on the concepts, looking for significant co-occurrences of a set of concepts per document across the corpus.

The data I used to build this proof-of-concept with came from one of my medium sized indexes, and contains 12,635,756 rows and 342,753 unique concepts. While Weka offers the Apriori algorithm, I suspect that it won't be able to handle this data volume. Mahout is probably a better fit, and it offers the FPGrowth algorithm running on Hadoop, so thats what I used. This post describes the things I had to do to prepare my data for Mahout, run the job with Mahout on Amazon Elastic Map Reduce (EMR) platform, then post process the data to get useful information out of it.

The first step is to preprocess my data so Mahout FP-Growth algorithm can consume it. Documents in our Lucene/Solr index encode their "concept map" in the sparse format shown below:

1
7996649$2.71 8002896$6.93 8256842$2.71 ...

Here each concept is represented by a numeric ID (the integer before the dollar sign) and has attached to it as payload the score for that concept that the annotator gave to the document (the floating point number after the dollar sign). The format expected by Mahout looks as shown below. The first number is a row ID, followed by a tab, followed by a sequence of concept IDs separated by space.

1
1  7996649 8002896 8256842 ...

The code below reads the Lucene index and writes out a text file in Mahout FP-Growth algorithm's input format.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Source: src/main/scala/com/mycompany/mia/fpg/PreProcess.scala
package com.mycompany.mia.fpg

import java.io.{File, FileWriter, PrintWriter}

import scala.Array.canBuildFrom
import scala.actors.threadpool.AtomicInteger

import org.apache.lucene.index.DirectoryReader
import org.apache.lucene.store.NIOFSDirectory

/**
 * Reads data from a Lucene index and writes out in format
 * expected by Mahout's FP Growth driver. Concept Map for
 * a document is stored in sparse representation as below:
 *   7996649$2.71 8002896$6.93 8256842$2.71 ...
 * The code below will convert it to the format expected by
 * Mahout's FP Growth algorithm as below:
 *   1  7996649 8002896 8256842 ...
 */
object Preprocess extends App {

  val IndexPath = "/path/to/lucene/index"
  val OutputFile = "data/imuids_p.csv"

  val reader = DirectoryReader.open(
    new NIOFSDirectory(new File(IndexPath)))
  val writer = new PrintWriter(new FileWriter(new File(OutputFile)))
  
  val ndocs = reader.numDocs()
  val counter = new AtomicInteger(1)
  (0 until ndocs).foreach(docId => {
    val recordId = counter.getAndIncrement()
    if (recordId % 1000 == 0)
      Console.println("Processed %d docs".format(recordId))
     val doc = reader.document(docId)
     val field = doc.getField("imuids_p")
     if (field != null) {
       val imuids = field.stringValue().split(" ").
         map(pair => pair.split("\\$")(0)).
         mkString(" ")
       writer.println("%d\t%s".format(recordId, imuids))
     }
  })
  writer.flush()
  writer.close()
  reader.close()
}

I then created a data bucket on Amazon S3 and uploaded the generated file "data/imuids_p.csv" to S3 into an input directory within the bucket, and the Mahout job JAR file into the top level directory in the bucket. I then started a EMR Job flow with custom JAR with the following parameters. The maxHeapSize and minSupport parameters were copied from this blog post as reasonable starting points. It also provided me with a starting point for my experiment.

1
2
3
4
5
6
7
JAR file location: s3n://conceptbasket/mahout-core-0.8-job.jar
JAR file parameters: org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver \
    --input s3n://conceptbasket/input/imuids_p.csv \
    --output s3n://conceptbasket/output \
    --maxHeapSize 50 \
    --method mapreduce \
    --minSupport 2

The EMR job was started with 1+4 m1.medium instances, and took 14 mins to complete. It writes out a bunch of Hadoop sequence files into a directory structure as follows:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
output
  |
  +-- fList
  |
  +-- fpgrowth
  |    |
  |    +-- part-r-*
  |
  +-- frequentpatterns
  |    |
  |    +-- part-r-*
  |
  +-- parallelcounting
  |    |
  |    +-- part-r-*

Looking at each of these files with the mahout seqdumper subcommand reveals that fList returns (concept ID, frequency) tuples for individual concepts and the frequentpatterns/part-r-* files contains (concept IDs, TopKStringPattern) tuples - the TopKStringPattern is a pair which contains a List of concept IDs representing a pattern and the number of occurrences. The other two directories contain intermediate data.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
sujit@cyclone:mahout-distribution-0.8$ bin/mahout seqdumper \
    -i /path/to/output/dir/fList
Input Path: /path/to/output/dir/fList
Key class: class org.apache.hadoop.io.Text \
Value Class: class org.apache.hadoop.io.LongWritable
Key: 9724770: Value: 23695
Key: 2791870: Value: 7017
Key: 9723331: Value: 6109
Key: 5344477: Value: 5066
Key: 9278078: Value: 4275
...
sujit@cyclone:mahout-distribution-0.8$ bin/mahout seqdumper \
    -i /path/to/output/dir/fpgrowth/part-r-000000
Input Path: /path/to/output/dir/fpgrowth/part-r-00000
Key class: class org.apache.hadoop.io.Text \
Value Class: class org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns
Key: 8106245: Value: ([8106245],323)
Key: 2795881: Value: ([2795881],323)
Key: 9723331 9361488: Value: ([9723331 9361488],324)
Key: 7984560: Value: ([7984560],324)
Key: 8814902: Value: ([8814902],325)
...
sujit@cyclone:mahout-distribution-0.8$ bin/mahout seqdumper \
    -i /path/to/output/dir/frequentpatterns/part-r-000000
Input Path: /path/to/output/dir/frequentpatterns/part-r-00000
Key class: class org.apache.hadoop.io.Text \
Value Class: class org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns
Key: 2790792 9361488 8856691: Value: ([2790792 9361488 8856691],10)
Key: 2790793 2791852 5356356 9723776 8131658: Value: ([2790793 2791852 5356356 9723776 8131658],2)
Key: 2790793 8206713: Value: ([2790793 8206713],2)
Key: 2790797 9361488: Value: ([2790797 9361488],2)
Key: 2790798: Value: ([2790798],20)
...
sujit@cyclone:mahout-distribution-0.8$ bin/mahout seqdumper \
    -i /path/to/output/dir/parallelcounting/part-r-000000
Input Path: /path/to/output/dir/parallelcounting/part-r-00000
Key class: class org.apache.hadoop.io.Text 
Value Class: class org.apache.hadoop.io.LongWritable
Key: 10000: Value: 1
Key: 1000000: Value: 1
Key: 10000003: Value: 1
Key: 10000010: Value: 1
Key: 10000017: Value: 1
...

Our objective of mining the association rules in the first place was to find concept co-occurences that were pervasive across the corpus. These could be the basis for building candidate search expansion rules to bring in results containing subjects derived from the original query. So we need some way to identify "good" rules from the results above, and have a human expert go through the rules to select the ones that make sense.

We do this by ranking the rules by two parameters - support and confidence. Support for a sequence of concept IDs is the number of rows containing that pattern relative to the total number of rows. The confidence of a rule X => Y is the support for (X union Y) divided by the support for X. Sorting the rules by support and confidence (descending) will provide us good candidate rules for review. The following code reads the sequence files output by Mahout and produces a CSV file listing each rule and its support and confidence metrics. The getName() method in the code is (deliberately) a no-op since that is a bit hard to set up correctly, but if implemented, can provide human readable rules.

I initially attempted to join the frequentpatterns/part-r-* files using Hadoop "fs -getmerge" subcommand but SBT (from where I was running the code) was running out of memory when reading the merged file - so was Mahout's seqdumper subcommand. So I changed the code to read each part-r-* file separately and join the results.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Source: src/main/scala/com/mycompany/mia/fpg/PostProcess.scala
package com.mycompany.mia.fpg

import java.io.{File, FileWriter, PrintWriter}
import scala.Array.canBuildFrom
import scala.collection.JavaConversions.asScalaBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, SequenceFile, Text}
import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns
import scala.io.Source

object PostProcess extends App {

  val N = 12635756 // number of transactions (rows)
  val FreqFile = "data/fpg/fList"
  val FrequentPatternsDir = "data/fpg/frequentpatterns"
  val OutputFile = "data/imuid_rules.csv"

  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  val frequency = computeFrequency(fs, conf, FreqFile)
  
  val writer = new PrintWriter(
    new FileWriter(new File(OutputFile)), true)

  val readers = new File(FrequentPatternsDir).list().
    filter(f => f.startsWith("part-r-")).
    map(f => new SequenceFile.Reader(fs, 
    new Path(FrequentPatternsDir, f), conf))
        
  readers.foreach(reader => {
    var key = new Text()
    var value = new TopKStringPatterns() // NOTE: deprecated in 0.8
    while (reader.next(key, value)) {
      val patterns = value.getPatterns()
      patterns.foreach(pattern => {
        // each pattern is a (concept_id_list,n) tuple that
        // states that the concepts in the list occurred n times.
        // - support for a pattern is given by n/N.
        // - each pattern translates to multiple rules, generated
        //   by rotating elements within a circular buffer, then
        //   making rules tail => head and calculate confidence
        //   for each rule as support / support(head).
        val items = pattern.getFirst()(0).split(" ")
        if (items.size > 1) {
          val support = (100.0D * pattern.getSecond()) / N
          items.foreach(item => {
            if (frequency.contains(item)) {
              val rest = items.filter(it => ! it.equals(item))
              val supportFirst = (100.0D * frequency(item)) / N
              val confidence = (100.0D * support) / supportFirst
              writer.println("""%5.3f,%5.3f,"%s => %s","%s => %s"""".format(
                support, confidence, rest.mkString("; "), item,
                rest.map(getName(_)).mkString("; "), getName(item)))
            }
          })
        }
      })
    }
    reader.close()
  })
  writer.flush()
  writer.close()
  
  def computeFrequency(fs: FileSystem,
      conf: Configuration, fList: String): Map[String,Long] = {
    val fs = FileSystem.get(conf)
    val reader = new SequenceFile.Reader(fs, new Path(fList), conf)
    var key = new Text()
    var value = new LongWritable()
    var freqs = Map[String,Long]()
    while (reader.next(key, value)) {
      freqs += ((key.toString(), value.get()))
    }
    reader.close()
    freqs
  }
  
  def getName(item: String): String = item
}

The CSV file can be easily imported into a spreadsheet program and sorted by support and confidence as described above. The screenshot below shows what this looks like:


While I was not able to produce the "[cattle; neurological disorder] => bovince spongiform encephalopathy" rule from this data, it did end up showing some interesting ones, such as: Flu <=> Epidemiology, Breast Cancer <=> Breast Tumor, [Hermaphroditism; Sexual Development] => Sexual Dysfunction, etc.

Running the actual algorithm was just a matter of calling a prebuilt Mahout class using Amazon EMR's web interface, but if you want the pre- and post-processing code, they can be found in my mia-scala-examples github project.

Be the first to comment. Comments are moderated to prevent spam.