Saturday, October 26, 2013

Moving back to Linux on the Laptop


Over the last 4 years I have owned a Macbook Pro. Couple of weeks ago (couple of hours after posting my previous blog post) the hard disk went click-click (I heard it, I had headphones on at the time) and died. After about 8-10 hours of googling and trying out various troubleshooting advices, I was able to resurrect the machine by formatting my hard disk and reinstalling the operating system from the recovery disk partition written to by OSX 10.8 (Mountain Lion), in the process suffering complete data loss.

I learned later that other users who recently upgraded to Mountain Lion had the same experience - disk crash, fixed by reformat and OSX reinstall. I had upgraded because I wanted to move to Java 1.7 - Snow Leopard, the previous OS, had support only for Java 1.6. I was able to recover all of my "production" Python scripts - stuff I wrote long ago but use day-to-day almost without thinking - thanks to a SugarSync account that only partially works (but it worked "well enough" in this particular case).

I was also able to get back most of my recent code (except the work in progress stuff I hadn't pushed) from various code repositories (GitHub, BitBucket, SourceForge, etc). Most of my EBooks are also available for re-download from my accounts at various publishers (Manning, Packt). I lost almost all my music except some songs which were scattered among various devices - EMusic was kind enough to give me a $50 credit to rebuild (a kernel of) my library.

Of course, the Macbook is about 4 years old and its not as if I expected to own it forever. I did hope that I could use it for about 6 years, given that the average life of my PC based laptops are about 2-3 years and this cost almost 2.5 times as much as one of those. At that point, I had planned on going back to using a Linux based PC laptop. Having heard the sound of the disk drive, I was pretty sure it was an unrecoverable hardware failure, so I went ahead and put this plan into motion. A bit premature, as it turned out, but I guess I have to live with the consequences of my actions...

The one major reason I liked the Macbook was its long battery life - 8 hours new, 4-5 hours now, still more than adequate for my purposes. I found one such system at LinuxCertified.com, a store that installs (and guarantees) Linux on popular laptop hardware. Since I have installed Linux before, I just wanted to know what hardware they use so as to maximize my chances of getting all features working under Linux.

I then took the specs for the Lenovo T530, added a few things - upgrade the CPU to a 4 core i5, increased memory from 2GB to 8GB, added an NVIDIA graphics card to support higher resolution (1600x900), and upgraded to the 9 cell (8 hours) battery instead of the 6 cell default - and placed an order at the Lenovo store. Ended up paying about the same amount (with tax) as the one advertised in LinuxCertified.com.

Once the Macbook came back to life, I had a change of heart and tried to cancel the order but Lenovo's return/cancellation policy is pretty draconian (at least by US standards) - once you hit the submit button, you have effectively bought it. Anyway, I figured that the disk on the Macbook crashed once, its only a matter of time before it dies again, so maybe just makes more sense to put it to pasture working on things like my kids' homework :-).

For the OS on the new Thinkpad, I had a 64 bit Ubuntu 13.04 disk from OSDisc.com purchased earlier for my work desktop. The only issue I faced was that the installer could not recognize the wired network during installation - however, once the machine came up with Ubuntu, it was able to recognize the wired (and wireless) connections. I am able to load Google-Chrome beta (to replace the Firefox default browser), watch and listen to lecture videos (for Coursera and other MOOC classes I am taking - I mention it because I can't get sound to work on my desktop). I haven't begun to use it fully (I am still typing this post on my Macbook), but all the software I need works fine on it (so far at least).

Looks wise, of course, this is a huge downgrade - almost like swapping a sports car for an SUV (the real kind) - the Macbook Pro is slim, sleek and sliver while the Thinkpad is thick, chunky and black. With respect to the OS though, I have worked with Unix long enough so Linux design choices just seem more logical, and I have always found the Mac OSX /Application, /Library, /System, etc trees somewhat strange and non-intuitive. Plus there is just more good free stuff on Linux than on OSX, so I am guessing my overall experience with this change would be positive.

So anyway, back to using Linux for everything again :-).

Tuesday, October 15, 2013

Entity Discovery using Mahout CollocDriver


I spent most of last week trying out various approaches to extract "interesting" phrases from a collection of articles. The objective was to identify candidate concepts that could be added to our taxonomy. There are various approaches, ranging from simple NGram frequencies, to algorithms such as RAKE (Rapid Automatic Keyword Extraction), to rescoring NGrams using Log Likelihood or Chi-squared measures. In this post, I describe how I used Mahout's CollocDriver (which uses the Log Likelihood measure) to find interesting phrases from a small corpus of about 200 articles.

The articles were in various formats (PDF, DOC, HTML), and I used Apache Tika to parse them into text (yes, I finally found the opportunity to learn Tika :-)). Tika provides parsers for many common formats, so all we have to do was to hook them up to produce text from the various file formats. Here is my code:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Source: src/main/scala/com/mycompany/mia/preprocess/TextExtractor.scala
package com.mycompany.mia.preprocess

import java.io.{File, FileInputStream, FileWriter, InputStream, PrintWriter}

import scala.collection.JavaConversions.asScalaIterator

import org.apache.commons.io.{FileUtils, FilenameUtils, IOUtils}
import org.apache.commons.io.filefilter.{DirectoryFileFilter, WildcardFileFilter}
import org.apache.tika.exception.TikaException
import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.{AutoDetectParser, ParseContext, Parser}
import org.apache.tika.parser.audio.AudioParser
import org.apache.tika.parser.html.HtmlParser
import org.apache.tika.parser.image.ImageParser
import org.apache.tika.parser.microsoft.OfficeParser
import org.apache.tika.parser.opendocument.OpenOfficeParser
import org.apache.tika.parser.pdf.PDFParser
import org.apache.tika.parser.rtf.RTFParser
import org.apache.tika.parser.txt.TXTParser
import org.apache.tika.parser.xml.XMLParser
import org.apache.tika.sax.WriteOutContentHandler

object TextExtractor extends App {
  val extractor = new TextExtractor()
  val idir = new File("/path/to/raw/files")
  val odir = new File("/path/to/text/files")
  extractor.extractDirToFiles(idir, odir, null)
}

class TextExtractor {

  object FileType extends Enumeration {
    type FileType = Value
    val Text, Html, Xml, Pdf, Rtf, OOText,
    MsExcel, MsWord, MsPowerpoint, MsOutlook, Visio,
    Png, Jpeg, Mp3, Undef = Value
  }
  object DocPart extends Enumeration {
    type DocPart = Value
    val Title, Author, Body, Error = Value
  }
  
  val parsers = Map[FileType.Value,Parser](
    (FileType.Text, new TXTParser()),
    (FileType.Html, new HtmlParser()),
    (FileType.Xml, new XMLParser()),
    (FileType.Pdf, new PDFParser()),
    (FileType.Rtf, new RTFParser()),
    (FileType.OOText, new OpenOfficeParser()),
    (FileType.MsExcel, new OfficeParser()),
    (FileType.MsWord, new OfficeParser()),
    (FileType.MsPowerpoint, new OfficeParser()),
    (FileType.MsOutlook, new OfficeParser()),
    (FileType.Visio, new OfficeParser()),
    (FileType.Png, new ImageParser()),
    (FileType.Jpeg, new ImageParser()),
    (FileType.Mp3, new AudioParser()),
    (FileType.Undef, new AutoDetectParser())
  )

  /** Extract single file into map of name value pairs */
  def extract(file: File): Map[DocPart.Value,String] = {
    var istream: InputStream = null
    try {
      istream = new FileInputStream(file)
      val handler = new WriteOutContentHandler(-1)
      val metadata = new Metadata()
      val parser = parsers(detectFileType(file))
      val ctx = new ParseContext()
      parser.parse(istream, handler, metadata, ctx)
      Map[DocPart.Value,String](
        (DocPart.Author, metadata.get(Metadata.CREATOR)),
        (DocPart.Title, metadata.get(Metadata.TITLE)),
        (DocPart.Body, handler.toString))
    } catch {
      case e: TikaException => Map[DocPart.Value,String](
        (DocPart.Error, e.getMessage()))      
    } finally {
      IOUtils.closeQuietly(istream)
    }
  }
  
  /** Detect FileType based on file name suffix */
  def detectFileType(file: File): FileType.Value = {
    val suffix = FilenameUtils.getExtension(file.getName()).
      toLowerCase()
    suffix match {
      case "text" | "txt" => FileType.Text
      case "html" | "htm" => FileType.Html
      case "xml"          => FileType.Xml
      case "pdf"          => FileType.Pdf
      case "rtf"          => FileType.Rtf
      case "odt"          => FileType.OOText
      case "xls" | "xlsx" => FileType.MsExcel
      case "doc" | "docx" => FileType.MsWord
      case "ppt" | "pptx" => FileType.MsPowerpoint
      case "pst"          => FileType.MsOutlook
      case "vsd"          => FileType.Visio
      case "png"          => FileType.Png
      case "jpg" | "jpeg" => FileType.Jpeg
      case "mp3"          => FileType.Mp3
      case _              => FileType.Undef
    }
  }
  
  /** Extract all files in directory with specified file name
      pattern. Accepts a renderer function to convert name-
      value pairs into an output file (or files).*/
  def extract(dir: File, pattern: String, odir: File,
      renderer: (File, File, Map[DocPart.Value,String]) => Unit): 
      Unit = {
    val filefilter = pattern match {
      case null => new WildcardFileFilter("*.*")
      case _ => new WildcardFileFilter(pattern)
    }
    FileUtils.iterateFiles(dir, filefilter, 
        DirectoryFileFilter.DIRECTORY).foreach(file => {
      Console.println("Parsing file: " + file.getName())
      val data = extract(file)
      renderer(file, odir, data)
    })
  }

  /** Convenience method to write out text extracted from a file
      into the specified directory as filename.txt */
  def extractDirToFiles(dir: File, odir: File, pattern: String): Unit = {
    def renderDirToFiles(file: File, odir: File, 
        data: Map[DocPart.Value,String]): Unit = {
      val ofname = file.getName() + ".txt"
      val writer = new PrintWriter(new FileWriter(new File(odir, ofname)), true)
      writer.println(data(DocPart.Title))
      writer.println(data(DocPart.Author))
      writer.println(data(DocPart.Body))
      writer.flush()
      writer.close()
    } 
    extract(dir, pattern, odir, renderDirToFiles)
  }
}

We then use Mahout subcommands to convert the directory of text files to a set of sequence files. I found lots of useful information in this thread from the Mahout Mailing list. Since I only had around 200 documents to work with, I just ran Mahout on my laptop with Hadoop in local mode.

1
2
3
4
5
sujit@localhost:mahout-distribution-0.8$ bin/mahout seqdirectory \
    -i /path/to/colloc/text \
    -o /path/to/colloc/text-seqdir \
    -c UTF8 \
    -chunk 5

We then run another Mahout subcommand seq2sparse, which vectorizes the sequence file created by seqdirectory.

1
2
3
sujit@localhost:mahout-distribution-0.8$ bin/mahout seq2sparse \
    -i /path/to/colloc/text-seqdir \
    -o /path/to/colloc/text-seqdir-sparse

This produces output as a directory structure as follows. Of the data produced, we use only the files in the tokenized-documents subdirectory for the following steps.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
text-seqdir-sparse/
+-- df-count
|   +-- _SUCCESS
|   +-- part-r-00000
+-- dictionary.file-0
+-- frequency.file-0
+-- tf-vectors
|   +-- _SUCCESS
|   +-- part-r-00000
+-- tfidf-vectors
|   +-- _SUCCESS
|   +-- part-r-00000
+-- tokenized-documents
|   +-- _SUCCESS
|   +-- part-m-00000
+-- wordcount
    +-- _SUCCESS
    +-- part-r-00000

We are now ready to run the CollocDriver. The CollocDriver uses Lucene's ShingleAnalyzer to produce n-grams, and a specified (Lucene) analyzer to tokenize the text in tokenized-documents. We specify the DefaultAnalyzer below, which is a Mahout subclass of Lucene's StandardAnalyzer but with a no-arg constructor (hence suitable for specifying on the command line). We only compute bigrams (n=2) and trigrams (n=3).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
sujit@localhost:mahout-distribution-0.8$ bin/mahout \
    org.apache.mahout.vectorizer.collocations.llr.CollocDriver \
    -i /path/to/colloc/text-seqdir-sparse/tokenized-documents \
    -o /path/to/colloc/text-bigrams \
    -ng 2 \
    -a org.apache.mahout.vectorizer.DefaultAnalyzer \
    -u
sujit@localhost:mahout-distribution-0.8$ bin/mahout \
    org.apache.mahout.vectorizer.collocations.llr.CollocDriver \
    -i /path/to/colloc/text-seqdir-sparse/tokenized-documents \
    -o /path/to/colloc/text-trigrams \
    -ng 3 \
    -a org.apache.mahout.vectorizer.DefaultAnalyzer \
    -u

This creates two subdirectories, ngrams and subgrams, under each output directory. We will only use the ngrams subdirectory for our next steps.

1
2
3
4
5
6
7
text-bigrams
+-- ngrams
|   +-- _SUCCESS
|   +-- part-r-00000
+-- subgrams
    +-- _SUCCESS
    +-- part-r-00000

To get the data in human readable form, we convert the sequence files in the ngrams subdirectory using Mahout's seqdumper subcommand. The key for these files is the bigram or trigram (space separated) and the value is the Log Likelihood ratio. There are good results in here, but there is also lots of noise. The process yields about 90K significant bigrams and 213K significant trigrams.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
sujit@localhost:mahout-distribution-0.8$ bin/mahout seqdumper \
  -i /path/to/colloc/text-bigrams/ngrams
Running on hadoop, using /opt/hadoop/bin/hadoop and HADOOP_CONF_DIR=
MAHOUT-JOB: /opt/mahout-distribution-0.8/mahout-examples-0.8-job.jar
Input Path: file:/path/to/colloc/text-bigrams/ngrams/part-r-00000
Key class: class org.apache.hadoop.io.Text \
    Value Class: class org.apache.hadoop.io.DoubleWritable
Key: zelboraf combo: Value: 48.62653920799494
...

sujit@localhost:mahout-distribution-0.8$ bin/mahout seqdumper \
  -i /path/to/colloc/text-trigrams/ngrams
Running on hadoop, using /opt/hadoop/bin/hadoop and HADOOP_CONF_DIR=
MAHOUT-JOB: /opt/mahout-distribution-0.8/mahout-examples-0.8-job.jar
Input Path: file:/path/to/colloc/text-trigrams/ngrams/part-r-00000
Key class: class org.apache.hadoop.io.Text \
    Value Class: class org.apache.hadoop.io.DoubleWritable
Key: zelboraf lebrikizumab dc: Value: 33.451520167291164
...

In an attempt to get better phrases, I used OpenNLP to parse out only noun phrase chunks from the input sentences. The code below parses the directory of text files and writes out the noun phrases, one per line and delimited by period (to signal "sentence boundaries" to the Mahout tokenizer.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Source: src/main/scala/com/mycompany/mia/preprocess/Tokenizer.scala
package com.mycompany.mia.preprocess

import java.io.{File, FileInputStream, FileWriter, InputStream, PrintWriter}

import scala.Array.canBuildFrom
import scala.io.Source

import org.apache.commons.io.IOUtils

import opennlp.tools.chunker.{ChunkerME, ChunkerModel}
import opennlp.tools.postag.{POSModel, POSTaggerME}
import opennlp.tools.sentdetect.{SentenceDetectorME, SentenceModel}
import opennlp.tools.tokenize.{TokenizerME, TokenizerModel}

object Tokenizer extends App {
  val tokenizer = new Tokenizer()
  val idir = new File("/path/to/colloc/text")
  val writer = new PrintWriter(new FileWriter(
    new File("/path/to/colloc/text-nphrases/phrases.txt")), true)
  idir.listFiles().foreach(file => {
    Source.fromFile(file).getLines().foreach(line => {
      val sentences = tokenizer.sentTokenize(line)
      sentences.foreach(sentence => {
        tokenizer.phraseChunk(sentence)
          .filter(phrase => "NP".equals(phrase._2))
          .foreach(phrase => writer.println(phrase._1 + "."))
      })
    })
  })
  writer.flush()
  writer.close()
}

class Tokenizer {

  val ModelDir = "/path/to/opennlp/models"

  val sentenceDetectorFn = (model: SentenceModel) =>
    new SentenceDetectorME(model)    
  val sentenceDetector = sentenceDetectorFn({
    var smis: InputStream = null
    try {
      smis = new FileInputStream(new File(ModelDir, "en_sent.bin"))
      val model = new SentenceModel(smis)
      model
    } finally {
      IOUtils.closeQuietly(smis)
    }   
  })
  val tokenizerFn = (model: TokenizerModel) => 
    new TokenizerME(model)
  val tokenizer = tokenizerFn({
    var tmis: InputStream = null
    try {
      tmis = new FileInputStream(new File(ModelDir, "en_token.bin"))
      val model = new TokenizerModel(tmis)
      model
    } finally {
      IOUtils.closeQuietly(tmis)
    }
  })
  val posTaggerFn = (model: POSModel) => 
    new POSTaggerME(model)
  val posTagger = posTaggerFn({
    var pmis: InputStream = null
    try {
      pmis = new FileInputStream(new File(ModelDir, "en_pos_maxent.bin"))
      val model = new POSModel(pmis)
      model
    } finally {
      IOUtils.closeQuietly(pmis)
    }
  })
  val chunkerFn = (model: ChunkerModel) => 
    new ChunkerME(model)
  val chunker = chunkerFn({
    var cmis: InputStream = null
    try {
      cmis = new FileInputStream(new File(ModelDir, "en_chunker.bin"))
      val model = new ChunkerModel(cmis)
      model
    } finally {
      IOUtils.closeQuietly(cmis)
    }
  })

  def sentTokenize(para: String): List[String] = {
    sentenceDetector.sentDetect(para).toList
  }
  
  def wordTokenize(sentence: String): List[String] = {
    return tokenizer.tokenize(sentence).toList
  }
  
  def posTag(sentence: String): List[(String,String)] = {
    val tokenSpans = tokenizer.tokenizePos(sentence)
    val tokens = tokenSpans.map(span => 
      span.getCoveredText(sentence).toString())
    val tags = posTagger.tag(tokens)
    tokens.zip(tags).toList
  }
  
  def phraseChunk(sentence: String): List[(String,String)] = {
    val tokenSpans = tokenizer.tokenizePos(sentence)
    val tokens = tokenSpans.map(span => 
      span.getCoveredText(sentence).toString())
    val tags = posTagger.tag(tokens)
    return chunker.chunkAsSpans(tokens, tags).map(chunk => {
      val start = tokenSpans(chunk.getStart()).getStart()
      val end = tokenSpans(chunk.getEnd() - 1).getEnd()
      (sentence.substring(start, end), chunk.getType())
    }).toList
  }
}

Running the pipeline against the /path/to/colloc/text-nphrases directory instead of /path/to/colloc/text directory cuts down the number of significant bigrams and trigrams to about 76K and 184K respectively. The resulting quality is better, but still not good enough to be used without manual filtering. However, it is still useful, because going through the file of keywords and deleting the ones that don't make sense is easier than reading all the documents and trying to find phrases.

Wednesday, October 02, 2013

Topic Modeling with Mahout on Amazon EMR


Introduction


The motivation for this work was a desire to understand the structure of a corpus in a manner different from what I am used to. Central to all our applications is a knowledge graph derived from our medical taxonomy. So any document corpus can easily be defined as a small set (50-100) of high level concepts, merely by rolling up document concepts into their parents until an adequate degree of granularity is achieved. I wanted to see if standard topic modeling techniques would yield comparable results. If so, perhaps the output of such a process could be used as feedback for concept creation.

This post describes Topic Modeling a smallish corpus (2,285 documents) from our document collection, using Apache Mahout's Latent Dirichlet Allocation (LDA) algorithm, and running it on Amazon Elastic Map Reduce (EMR) platform. Mahout provides the LDA implementation, as well as utilities for IO. The code I wrote work at the two ends of the pipeline, first to download and parse data for Mahout to consume, and then to produce a report of top terms in each topic category.

Even though Mahout (I used version 0.8) provided most of the functionality for this work, the experience was hardly straightforward. The official documentation is outdated, and I had to repeatedly refer to discussions on the Mahout Mailing lists to find solutions for problems I faced along the way. I found only one blog post based on Mahout version 0.5 that I used as a starting point. Of course, all's well that ends well, and I was ultimately able to get the top terms for each topic and the topic composition of each document.

Theory


The math behind LDA is quite formidable as you can see from its Wikipedia page, but here is a somewhat high-level view, selectively gleaned from this paper by Steyvers and Griffiths.

Topic Models are based upon the idea that documents are mixtures of topics, where a topic is a probability distribution over words. To make a new document, one chooses a distribution over topics. Then for each word in the document, one chooses a topic at random and draws a word from the topic.

In order to answer the (IMO more interesting) question of what topics make up a collection of documents, you invert this process. Each Topic Modeling algorithm does it differently. LDA provides an approximate iterative method to sample values sequentially, proceeding until sample values converge to the target distribution.

Preparing the data


The documents for this work come from our Content Management System, and this section describes the extraction code. Its included for completeness. Your setup is likely very different, so it may be of limited use to you. In any case, our CMS is loosely coupled to our web front end via a publisher, which serializes documents in JSON format onto a network filesystem. Content can be pulled off a REST API off this filesystem most efficiently if you know the "file ID". I use Solr to get a list of these file IDs, and download it to my local filesystem for further processing.

Processing consists of parsing out the text content of the files (each content type can define its own JSON format), then using NLTK to remove HTML tags, stopwords, numeric tokens and punctuation. The text versions of the JSON files are written out to another directory for feeding into the Mahout pipeline.

Code is in Python, its shown below. Hostnames and such have been changed to protect the innocent.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import json
import nltk
import os
import os.path
import string
import urllib
import urllib2

SOLR_SERVER = "http://solrserver.mycompany.com:8983/solr/select"
CONTENT_SERVER = "http://contentserver.mycompany.com/view"
JSON_OUTPUTDIR = "/path/to/data/hlcms_jsons"
TEXT_OUTPUTDIR = "/path/to/data/hlcms_text"
FILENAMES_FILE = "/tmp/hlcms_filenames.txt"

STOPWORDS = nltk.corpus.stopwords.words("english")
PUNCTUATIONS = {c:"" for c in string.punctuation}

def textify(s):
  text = nltk.clean_html(s)
  sentences = nltk.sent_tokenize(text)
  words = []
  for sentence in sentences:
    sent = sentence.encode("utf-8", 'ascii')
    sent = "".join([PUNCTUATIONS[c] if PUNCTUATIONS.has_key(c) else c 
                                    for c in sent])
    ws = nltk.word_tokenize(sent)
    for w in ws:
      if w in STOPWORDS: continue
      if w.replace(",", "").replace(".", "").isdigit(): continue
      words.append(w.lower())
  return " ".join(words)

# build list of all file parameter values from solr
params = urllib.urlencode({
  "q" : "sourcename:hlcms",
  "start" : "0",
  "rows" : "0",
  "fl" : "contenttype,cmsurl",
  "wt" : "json"
})
conn = urllib.urlopen(SOLR_SERVER, params)
rsp = json.load(conn)
numfound = rsp["response"]["numFound"]
print "# of CMS articles to download: ", numfound
filenames = open(FILENAMES_FILE, 'wb')
npages = int(numfound/10) + 1
for pg in range(0, npages):
  if pg % 100 == 0:
    print "Downloading HLCMS page #: %d" % (pg)
  params = urllib.urlencode({
    "q" : "sourcename:hlcms",
    "start" : str(pg * 10),
    "rows" : "10",
    "fl" : "contenttype,cmsurl",
    "wt" : "json"
  })
  conn = urllib.urlopen(SOLR_SERVER, params)
  rsp = json.load(conn)
  for doc in rsp["response"]["docs"]:
    try:
      contenttype = doc["contenttype"]
      cmsurl = doc["cmsurl"]
      filenames.write("%s-%s\n" % (contenttype, cmsurl))
    except KeyError:
      continue
filenames.close()

# for each file parameter, build URL and extract data into local dir
filenames2 = open(FILENAMES_FILE, 'rb')
for filename in filenames2:
  fn = filename.strip()
  ofn = os.path.join(JSON_OUTPUTDIR, fn + ".json")
  print "Downloading file: ", fn
  try:
    output = open(ofn, 'wb')
    response = urllib2.urlopen(CONTENT_SERVER + "?file=" + fn + "&raw=true")
    output.write(response.read())
    output.close()
  except IOError:
    continue
filenames2.close()
print "All files downloaded"

# build parser for each content type to extract title and body
for file in os.listdir(JSON_OUTPUTDIR):
  print "Parsing file: %s" % (file)
  fin = open(os.path.join(JSON_OUTPUTDIR, file), 'rb')
  ofn = os.path.join(TEXT_OUTPUTDIR, 
    os.path.basename(file[0:file.rindex(".json")]) + ".txt")
  fout = open(ofn, 'wb')
  try:
    doc_json = json.load(fin)
    # parsing out title and body based on content type
    # since different content types can have own format
    if file.startswith("ctype1-"):
      for fval in ["title", "bm_intro", "bm_seo_body"]:
        fout.write("%s\n" % (textify(doc_json[fval])))
    elif file.startswith("ctype2-"):
      for fval in ["body"]:
        fout.write("%s\n" % (textify(doc_json[fval])))
    elif file.startswith("ctype3-"):
      for fval in ["title", "body"]:
        fout.write("%s\n" % (textify(doc_json[fval])))
    elif file.startswith("ctype4-"):
      fout.write("%s\n" % (textify(doc_json["recipeDeck"])))
      fout.write("%s\n" % (textify(". ".join([x.values()[0] 
                           for x in doc_json["directions"]]))))
    elif file.startswith("ctype5-"):
      for fval in ["title", "body"]:
        fout.write("%s\n" % (textify(doc_json[fval])))
    else:
      continue
  except ValueError as e:
    print "ERROR!", e
    continue
  fout.close()
  fin.close()

# filter out files with 0 bytes and remove them from text output directory
for file in os.listdir(TEXT_OUTPUTDIR):
  fname = os.path.join(TEXT_OUTPUTDIR, file)
  size = os.path.getsize(fname)
  if size == 0:
    print "Deleting zero byte file:", os.path.basename(fname)
    os.remove(fname)

Converting Text Files to Sequence File


The end product of the step above is a directory of text files. Punctuations, stopwords and number tokens have been stripped (because they are of limited value as topic terms) and all characters have been lowercased (not strictly necessary, because the vectorization step takes care of that). So each file is essentially now a bag of words.

Our pipeline is Hadoop based, and Hadoop likes small number of large files, so this step converts the directory of 2,258 text files into a single large sequence file, where each row represents a single file. I run the mahout seqdirectory subcommand locally to do this, then copy the output to Amazon EMR using s3cmd (available on Ubuntu via apt-get and on Mac OS via macports).

1
2
3
4
5
6
sujit@localhost:data$ $MAHOUT_HOME/bin/mahout seqdirectory \
    --input /path/to/data/hlcms_text \
    --output /path/to/data/hlcms_seq \
    -c UTF-8
sujit@localhost:data$ s3cmd put /path/to/data/hlcms_seq \
    s3://mybucket/cmstopics/

Vectorizing the Input


The next step is to create a term-document matrix out of the sequence files. Once again, we can do this locally with the Mahout seq2sparse subcommand. I choose to do this on Amazon EMR - the only change is to specify the name of the class that corresponds to the seq2sparse subcommand (you can find this information in $MAHOUT_HOME/conf/driver.classes.default.props). You also need to copy over the Mahout job JAR to S3.

JAR location: s3n://mybucket/cmstopics/mahout-core-0.8-job.jar
JAR arguments:
org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles \
-i s3n://mybucket/cmstopics/hlcms_seq \
-o s3n://mybucket/cmstopics/hlcms_vec \
-wt tf

With Amazon's Hadoop Distribution (ie choosing Amazon Distribution for the Hadoop Version prompt in the AWS EMR console) results in this error.

1
Error running child : java.lang.NoSuchFieldError: LUCENE_43

This is very likely caused by the Amazon distribution gratitously including old Lucene JARS (older than the Lucene 4.3 the Mahout 0.8 job JAR includes) within it. At runtime Lucene classes from the Amazon JARs are being loaded, which don't know anything about LUCENE_43 because they do not (yet) exist for it. My solution was to try the MapR M7 distribution (at least partly based on the reason that Ted Dunning works for MapR and he is a committer for Mahout :-)). However, MapR (all distributions) require m1.large instances at minimum, so its a bit more expensive.

This step creates an output directory hlcms_vec that looks like this. Of these, the only ones of interest to this pipeline are the tf-vectors folder and the dictionary.file-0 file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
hlcms_vec/
+-- df-count
|   +-- _SUCCESS
|   +-- part-r-00000
+-- dictionary.file-0
+-- frequency.file-0
+-- tf-vectors
|   +-- _SUCCESS
|   +-- part-r-00000
+-- tokenized-documents
|   +-- _SUCCESS
|   +-- part-m-00000
+-- wordcount
    +-- _SUCCESS
    +-- part-r-00000

Converting Keys to IntWritables


This step is not documented in the official documentation. The blog post does not mention it either, but thats probably because Mahout 0.5's lda subcommand was deprecated in favor of the cvb subcommand. The tf-vectors file contains (Text, VectorWritable) tuples, but cvb expects to read (IntWritable, VectorWritable). The rowid subcommand does this conversion. Interestingly, the rowid job is contained in mahout-examples-0.8-job.jar and not in the main job JAR. Attempting to run it on Amazon EMR on either Amazon or MapR distributions produces errors to the effect that it can only be run locally.

1
2
3
4
5
6
7
8
9
# running under MapR distribution
java.io.IOException: \
Could not resolve any CLDB hostnames for cluster: mybucket:7222
# running under Amazon distribution
java.lang.IllegalArgumentException: \
This file system object (hdfs://10.255.35.8:9000) does not support \
access to the request path 's3n://mybucket/cmstopics/cvb-vectors/docIndex'\
You possibly called FileSystem.get(conf) when you should have called \
FileSystem.get(uri, conf) to obtain a file system supporting your path.

So I ended up pulling down tf-vectors locally, converting to tf-vectors-cvb and then uploading back to S3.

1
2
3
4
5
6
7
8
sujit@localhost:data$ s3cmd get --recursive \
  s3://mybucket/cmstopics/hlcms_vec/tf-vectors/ \
  hlcms_vec/tf-vectors
sujit@localhost:data$ $MAHOUT_HOME/bin/mahout rowid \
  -i /path/to/data/hlcms_vec/tf-vectors \
  -o /path/to/data/hlcms_vec/tf-vectors-cvb
sujit@localhost:data$ s3cmd put tf-vectors-cvb \
  s3://mybucket/cmstopics/hlcms_vec/

After this subcommand is run, there is an additional folder tf-vectors-cvb in the hlcms_vec folder. The tf-vectors-cvb folder contains 2 files, matrix and docindex. Our pipeline only cares about the data in the matrix file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
hlcms_vec
+-- df-count
|   +-- _SUCCESS
|   +-- part-r-00000
+-- dictionary.file-0
+-- frequency.file-0
+-- tf-vectors
|   +-- _SUCCESS
|   +-- part-r-00000
+-- tf-vectors-cvb
|   +-- docindex
|   +-- matrix
+-- tokenized-documents
|   +-- _SUCCESS
|   +-- part-m-00000
+-- wordcount
    +-- _SUCCESS
    +-- part-r-00000

Run LDA on Modified term-vector input


Finally, we are ready to run LDA on our corpus. The Mahout lda subcommand has been deprecated and replaced with the cvb subcommand, which uses the Collapsed Variational Bayes (CVB) algorithm to do LDA. We run LDA with 50 topics (-k) for 30 iterations (-x) on Amazon EMR using a MapR distribution, with the following parameters.

JAR location: s3n://mybucket/cmstopics/mahout-core-0.8-job.jar
JAR arguments:
org.apache.mahout.clustering.lda.cvb.CVB0Driver \
-i s3n://mybucket/cmstopics/hlcms_vec/tf-vectors-cvb/matrix \
-dict s3n://mybucket/cmstopics/hlcms_vec/dictionary.file-0 \
-o s3n://mybucket/cmstopics/hlcms_lda/topicterm \
-dt s3n://mybucket/cmstopics/hlcms_lda/doctopic \
-k 50 \
-ow \
-x 30 \
-a 1 \
-e 1

Number of things to keep in mind here. For one, -nt (number of terms) should not be specified if -dict is specified, since it can be inferred from -dict (or your job may fail). Also don't specify -mt (model directory) since otherwise the job will fail if it can't find one.

The output of the job is two folders, doctopic and topicterm. Both contain sequence files with (IntWritable,VectorWritable) tuples. Each row of doctopic represents a document and the VectorWritable is a list of p(topic|doc) for a topic. Each row of topicterm represents a topic and the VectorWritable is a list of p(term|topic) values for each term.

1
2
3
4
5
6
7
8
9
hlcms_lda
+-- doctopic
|   +-- _SUCCESS
|   +-- part-m-00000
+-- topicterm
    +-- _SUCCESS
    +-- part-m-00001
    +-- ...
    +-- part-m-00009

Dump results into CSV


The official documentation says to use Mahout's ldatopics subcommand, but according to StackOverflow page, ldatopics is deprecated and you should use the vectordump subcommand instead.

The vectordump subcommand merges the information from the dictionary file and one of doctopic or topicterm and it writes out a CSV file representing a matrix of p(topic|doc) or p(term|topic) respectively. I wasn't sure how to dump out into a local filesystem on Amazon EMR, so I just copied the files locally using s3cmd and ran vectordump on them.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
sujit@localhost:data$ s3cmd get --recursive \
  s3://mybucket/cmstopics/hlcms_lda hlcms_lda
sujit@localhost:data$ $MAHOUT_HOME/bin/mahout vectordump \
  -i /path/to/data/hlcms_lda/topicterm \
  -d /path/to/data/hlcms_vec/dictionary.file-0 \
  -dt sequencefile \
  -c csv \
  -p true \
  -o ./p_term_topic.txt
  -sort /path/to/data/hlcms_lda/topicterm \
  -vs 10
sujit@localhost:data$ $MAHOUT_HOME/bin/mahout vectordump \
  -i /path/to/data/hlcms_lda/doctopic \
  -d /path/to/data/hlcms_vec/dictionary.file-0 \
  -dt sequencefile \
  -c csv \
  -p true \
  -o ./p_topic_doc.txt
  -sort /path/to/data/hlcms_lda/doctopic \
  -vs 10 

The p_term_topic.txt contains the p(term|topic) for each of the 50 topics, one topic per row. The p_topic_doc.txt contains the p(topic|doc) values for each document, one document per row.

Create Reports


We can create some interesting reports out of the data computed above. One such would be to find the top 10 words for each topic cluster. Here is the code for this report:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import operator
import string

terms = {}

f = open("/path/to/data/p_term_topic.txt", 'rb')
ln = 0
for line in f:
  if len(line.strip()) == 0: continue
  if ln == 0:
    # make {id,term} dictionary for use later
    tn = 0
    for term in line.strip().split(","):
      terms[tn] = term
      tn += 1
  else:
    # parse out topic and probability, then build map of term to score
    # finally sort by score and print top 10 terms for each topic.
    topic, probs = line.strip().split("\t")
    termProbs = {}
    pn = 0
    for prob in probs.split(","):
      termProbs[terms[pn]] = float(prob)
      pn += 1
    toptermProbs = sorted(termProbs.iteritems(),
      key=operator.itemgetter(1), reverse=True)
    print "Topic: %s" % (topic)
    print "\n".join([(" "*3 + x[0]) for x in toptermProbs[0:10]])
  ln += 1
f.close()

And the results are shown (after some editing to make them easier to read) below:

Topic: 0 Topic: 1 Topic: 2 Topic: 3 Topic: 4
droids applaud technique explosions sufferers born delight succeed compliant warming responds stools technique explosions applaud proposal stern centers warming succeed responds applaud droids explosions proposal born delight sexually upsidedown hemophilia elisa responds sufferers born delight sexually fully hemophilia fury upsidedown technique sufferers stools droids explosions knees amount stabilized centers stern
Topic: 5 Topic: 6 Topic: 7 Topic: 8 Topic: 9
group's technique stools applaud born amount stern vascular vectors knees technique droids stools authored interchangeably stern households vectors bleed muchneeded sufferers technique responds explosions applaud born compliant stabilized recording punch droids explosions responds technique born upsidedown hypogastric compliant flinn bleed group's responds applaud explosions technique born vectors delight punch fully
Topic: 10 Topic: 11 Topic: 12 Topic: 13 Topic: 14
group's responds sufferers explosions droids authored proposal centers thick flinn applaud droids sufferers technique responds stools born vectors delight succeed explosions applaud stools stern born upsidedown delight fury recording hypogastric sufferers applaud interchangeably muchneeded households stabilized sexually ninety succeed flinn technique stools responds droids interchangeably centers muchneeded thick upsidedown punch
Topic: 15 Topic: 16 Topic: 17 Topic: 18 Topic: 19
group's responds sufferers technique stools explosions flinn hemophilia delight centers responds applaud technique vectors knees stern stabilized vascular sexually recording responds stools sufferers vectors centers ninety warming households muchneeded interchangeably technique sufferers explosions proposal born hemophilia centers delight fury compliant group's sufferers applaud droids stools born centers punch compliant delight
Topic: 20 Topic: 21 Topic: 22 Topic: 23 Topic: 24
technique responds sufferers applaud droids stools interchangeably amount born ninety responds applaud sufferers droids born delight sexually flinn vascular thick applaud explosions droids born delight upsidedown interchangeably amount compliant punch technique explosions vectors fury stern vascular households untreatable hemophilia stabilized technique droids applaud sufferers stools stern amount interchangeably households centers
Topic: 25 Topic: 26 Topic: 27 Topic: 28 Topic: 29
stools sufferers responds born knees amount vectors flinn untreatable upsidedown stools explosions proposal authored droids vectors knees fury amount succeed stools proposal responds applaud born knees amount vascular untreatable hypogastric applaud technique explosions sufferers droids responds stabilized centers punch muchneeded responds stools droids explosions interchangeably stern households ninety upsidedown amount
Topic: 30 Topic: 31 Topic: 32 Topic: 33 Topic: 34
responds explosions applaud sufferers stools droids centers compliant vectors thick stools explosions droids technique vectors centers muchneeded thick flinn stabilized responds technique droids stools explosions born interchangeably households fury hypogastric applaud explosions droids technique compliant punch centers warming hemophilia fully droids technique vectors stern interchangeably fury households muchneeded amount knees
Topic: 35 Topic: 36 Topic: 37 Topic: 38 Topic: 39
sufferers technique responds authored centers vectors interchangeably punch fully warming technique stools responds droids authored stern fury ninety bleed compliant elisa sufferers group's technique droids interchangeably centers vectors punch thick stools proposal technique sexually upsidedown stabilized thick punch muchneeded compliant interchangeably stabilized vectors centers punch compliant ninety delight hemophilia droids
Topic: 40 Topic: 41 Topic: 42 Topic: 43 Topic: 44
stools applaud responds sufferers authored born flinn interchangeably hypogastric fury group's responds sufferers applaud authored centers fury bleed hypogastric stern responds stools technique sufferers applaud vectors amount knees untreatable upsidedown elisa technique explosions responds stools proposal stern succeed born warming stools applaud authored interchangeably stern born ninety muchneeded households warming
Topic: 45 Topic: 46 Topic: 47 Topic: 48 Topic: 49
responds droids sufferers interchangeably fury vectors households ninety muchneeded stern group's droids stools explosions applaud authored proposal sufferers interchangeably stabilized sufferers group's explosions applaud responds droids technique stools interchangeably fury amount stern knees flinn compliant sexually thick bleed upsidedown punch technique droids applaud sufferers explosions born amount knees centers succeed

Another interesting report would be to see the composition of topics within the corpus. We calculate the "topic" of a document as the topic with the highest p(topic|doc) value for that document. We then display the number of documents across various topics as a histogram. Here is the code:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import numpy as np
import pylab as pl

f = open("/path/to/data/p_topic_doc.txt", 'rb')
xvals = range(0, 50)
tcounts = np.zeros((50))
for line in f:
  line = line.strip()
  if len(line) == 0 or line.startswith("#"): continue
  docid, probs = line.split("\t")
  plist = [float(p) for p in probs.split(",")]
  topic = plist.index(max(plist))
  tcounts[topic] += 1
f.close()
yvals = list(tcounts)
print xvals
print yvals
fig = pl.figure()
ax = pl.subplot(111)
ax.bar(xvals, yvals)
pl.ylabel("#-Documents")
pl.xlabel("Topics")
pl.show()

and here is the resulting histogram. As you can see, the distribution appears fairly uniform with a few popular topics. We could try to correlate these topics with the popular words in the topic to figure out what our corpus is all about.

Yet another application could be to think of LDA as a feature reduction strategy, converting the problem down to only 50 features (the number of topics) represented by the p(topic|doc) values..

Conclusion


Topic Modeling can be a powerful tool and provides interesting insights into your data. Mahout is one of the few packages that can do Topic Modeling at scale. However, using it was daunting because of poor/outdated documentation. Mahout hasn't yet reached the 1.0 release milestone, and there is already some work being done within the Mahout community to improve documentation, so hopefully it will all be ironed out by that time.