I've been using the Enron Dataset for a couple of projects now, and I figured that it would be interesting to see if I could glean some information out of the data. One can of course simply read the Wikipedia article, but that would be too easy and not as much fun :-).
My focus on this analysis is on the "what" and the "who", ie, what are the important ideas in this corpus and who are the principal players. For that I did the following:
- Extracted the words from Lucene's inverted index into (term, docID, freq) triples. Using this, I construct a frequency distribution of words in the corpus. Looking at the most frequent words gives us an idea of what is being discussed.
- Extract the email (from, {to, cc, bcc}) pairs from MongoDB. Using this, I piggyback on Scalding's PageRank implementation to produce a list of emails by page rank. This gives us an idea of the "important" players.
- Using the triples extracted from Lucene, construct tuples of (docID, termvector), then cluster the documents using KMeans. This gives us an idea of the spread of ideas in the corpus. Originally, the idea was to use Mahout for the clustering, but I ended up using Weka instead.
I also wanted to get more familiar with Scalding beyond the basic stuff I did before, so I used that where I would have used Hadoop previously. The rest of the code is in Scala as usual.
Unfortunately, my Scalding-fu was not strong enough, because I couldn't figure out how to run Scalding jobs in non-local mode, and I was running out of memory for some of the jobs in local mode. Also, Mahout expects its document vectors to be in SequenceFile format, but Scalding does not allow you to write SequenceFiles in local mode. As a result, I converted the document vector file to ARFF format and used Weka for the clustering instead. I have asked about this on the Cascading-Users mailing list.
Frequency Distribution
Here is the the code to extract the (term, docID, freq) tuples from the Lucene index. As you can see the generate method takes three cutoff parameters, the minimum document frequency, the minimum total term frequency and minimum term frequency. I added these cutoffs in because the unfiltered output contained about 700 million triples and was causing the next job (FreqDist) to throw an OutOfMemoryException.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/Lucene4TermFreq.scala
package com.mycompany.solr4extras.corpus
import java.io.{PrintWriter, FileWriter, File}
import org.apache.lucene.index.{MultiFields, IndexReader, DocsEnum}
import org.apache.lucene.search.DocIdSetIterator
import org.apache.lucene.store.NIOFSDirectory
import org.apache.lucene.util.BytesRef
/**
* Reads a Lucene4 index (new API) and writes out a
* text file as (term, docID, frequency_of_term_in_doc).
* @param indexDir the location of the Lucene index.
* @param outputFile the output file name.
* @param minDocFreq terms which are present in fewer
* documents than minDocFreq will be ignored.
* @param minTTF the minimum Total Term Frequency a
* term must have to be considered for inclusion.
* @param minTermFreq the minimum term frequency within
* a document so the term is included.
*/
class Lucene4TermFreq(indexDir: String) {
def generate(outputFile: String, minDocs: Int,
minTTF: Int, minTermFreq: Int): Unit = {
val reader = IndexReader.open(
new NIOFSDirectory(new File(indexDir), null))
val writer = new PrintWriter(new FileWriter(outputFile), true)
val terms = MultiFields.getTerms(reader, "body").iterator(null)
var term: BytesRef = null
var docs: DocsEnum = null
do {
term = terms.next
if (term != null) {
val docFreq = terms.docFreq
val ttf = terms.totalTermFreq
if (docFreq > minDocs && ttf > minTTF) {
docs = terms.docs(null, docs)
var docID: Int = -1
do {
docID = docs.nextDoc
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
val termFreq = docs.freq
if (termFreq > minTermFreq)
writer.println("%s\t%d\t%d".format(
term.utf8ToString, docID, docs.freq))
}
} while (docID != DocIdSetIterator.NO_MORE_DOCS)
}
}
} while (term != null)
writer.flush
writer.close
reader.close
}
}
|
To decide the cutoffs, I first plotted the sorted total term frequencies of all words in the corpus, that results in the Zipf distribution shown below, with the last 20,000 terms contributing to most of the occurrence count. Cutting the TTFs off at about 1,000 occurrences, and setting the minimum document frequency to 5 (term must exist in at least 5 documents) and minimum term frequency to 10 (term must exist at least 10 times in a document to be counted) resulted in a more manageable number of about 1.27 million triples.
Here is the code to compute the Frequency Distribution of the terms in the corpus.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/FreqDist.scala
package com.mycompany.solr4extras.corpus
import com.twitter.scalding.{Tsv, Job, Args}
import cascading.pipe.joiner.LeftJoin
/**
* Reads input of the form (term docID freq), removes stopword
* terms based on a stop word list, sums up the term frequency
* across docs and outputs the term frequency counts sorted by
* count descending as (term count).
* NOTE: this can also be done directly from Lucene using
* totalTermFreq.
*/
class FreqDist(args: Args) extends Job(args) {
val stopwords = Tsv(args("stopwords"), ('stopword)).read
val input = Tsv(args("input"), ('term, 'docID, 'freq))
val output = Tsv(args("output"))
input.read.
joinWithSmaller('term -> 'stopword, stopwords, joiner = new LeftJoin).
filter('stopword) { stopword: String =>
(stopword == null || stopword.isEmpty)
}.
groupBy('term) { _.sum('freq) }.
groupAll { _.sortBy('freq).reverse }.
write(output)
}
|
The top 100 words (and their raw frequencies) from the resulting frequency distribution are shown below:
enron (1349349), ect (1133513), hou (578328), subject (446732), pm (388238), http (325908), power (309063), cc (303380), enron.com (290452), energy (286475), corp (262331), message (245042), mail (234409), time (217656), gas (216405), company (189878), market (181420), information (180539), ees (176279), original (170590), call (153876), california (152559), business (148100), forwarded (145700), day (138167), na (132233), td (132135), font (132130), price (131493), week (131246), state (130245), year (127376), email (124309), attached (121637), houston (119962), image (118982), john (113327), meeting (111627), agreement (111621), mark (111518), deal (108930), make (106030), group (105643), trading (105130), questions (103417), enron_development (102359), contact (97935), date (96189), back (95138), million (93875), services (93825), work (92014), jeff (89695), today (89682), report (89632), electricity (88541), service (88489), monday (87089), prices (85148), free (83838), friday (83750), credit (83487), contract (82981), system (80955), financial (80927), good (80489), review (78720), fax (78219), management (76054), companies (75371), david (74984), news (74666), number (73917), file (73378), jones (72288), thursday (72182), order (71953), list (71845), send (71824), forward (71651), tuesday (71515), office (70850), october (70826), based (70518), enronxgate (69905), wednesday (69339), risk (69091), change (68911), received (68688), mike (68609), issues (68449), team (67302), bill (67289), click (66993), plan (66953), customers (66047), communications (65620), november (65478), phone (65434), provide (65347)
Page Rank
The next thing I wanted to find was to somehow rank the players (the email authors) in importance. In any organization, people tend to email at their own level, but include their immediate bosses on the CC or BCC as a form of CYA. So top management would get relatively few (but highly ranked) emails from middle management, and middle management would get many (low ranked) emails from their underlings. Sorting the email authors by descending order of page rank should tell us about he major players.
The input data comes from the MongoDB database I populated for my previous project. I had to go through a few hoops because the data is encrypted, but the net result is that I end up with two files, the first containing the (from_email, from_id) tuples and the second containing (from_id, {to_id, cc_id, bcc_id}) tuples. The reason I generated two files is because the Scalding distribution comes with a PageRank implementation which expects the input data to be numeric. Here is the extraction code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/MongoEmailPairs.scala
package com.mycompany.solr4extras.corpus
import java.io.{PrintWriter, FileWriter, File}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.commons.codec.binary.Hex
import org.apache.lucene.index.IndexReader
import org.apache.lucene.store.NIOFSDirectory
import com.mongodb.casbah.Imports.{wrapDBObj, wrapDBList, MongoDBObject, MongoConnection, BasicDBList}
import com.mycompany.solr4extras.secure.CryptUtils
class MongoEmailPairs(host: String, port: Int, db: String,
indexDir: String) {
val conn = MongoConnection(host, port)
val emails = conn(db)("emails")
val users = conn(db)("users")
val reader = IndexReader.open(
new NIOFSDirectory(new File(indexDir), null))
def generate(refFile: String, outputFile: String): Unit = {
val counter = new AtomicInteger(0)
val userKeys = users.find().map(user =>
user.as[String]("email") ->
(Hex.decodeHex(user.as[String]("key").toCharArray),
Hex.decodeHex(user.as[String]("initvector").toCharArray),
counter.incrementAndGet)).toMap
// write out dictionary file for reference
val refWriter = new PrintWriter(new FileWriter(new File(refFile)), true)
userKeys.map(user =>
refWriter.println("%s\t%d".format(user._1, user._2._3))
)
refWriter.flush
refWriter.close
// write out main file as required by PageRank
val dataWriter = new PrintWriter(new FileWriter(new File(outputFile)), true)
val numdocs = reader.numDocs
var i = 0
while (i < numdocs) {
val doc = reader.document(i)
val messageID = doc.get("message_id").asInstanceOf[String]
val author = doc.get("from").asInstanceOf[String]
val mongoQuery = MongoDBObject("message_id" -> messageID)
val cur = emails.find(mongoQuery)
emails.findOne(mongoQuery) match {
case Some(email) => {
try {
val from = CryptUtils.decrypt(
Hex.decodeHex(email.as[String]("from").toCharArray),
userKeys(author)._1, userKeys(author)._2)
val fromId = userKeys(from)._3
val targets =
(try {
email.as[BasicDBList]("to").toList
} catch {
case e: NoSuchElementException => List()
}) ++
(try {
email.as[BasicDBList]("cc").toList
} catch {
case e: NoSuchElementException => List()
}) ++
(try {
email.as[BasicDBList]("bcc").toList
} catch {
case e: NoSuchElementException => List()
})
targets.map(target => {
val targetEmail = CryptUtils.decrypt(Hex.decodeHex(
target.asInstanceOf[String].toCharArray),
userKeys(author)._1, userKeys(author)._2).trim
val targetEmailId = userKeys(targetEmail)._3
dataWriter.println("%d\t%d".format(fromId, targetEmailId))
})
} catch {
// TODO: BadPaddingException, likely caused by,
// problems during population. Fix, but skip for now
case e: Exception => println("error, skipping")
}
}
case None => // skip
}
i = i + 1
}
dataWriter.flush
dataWriter.close
reader.close
conn.close
}
}
|
We then create our own subclass of PageRank and override the initialize method that produces a Source tap for PageRank from the email pairs we just generated from MongoDB. The work that we do here is to group by the from_id and aggregate the to_ids into a comma-separated list, then add a column with the initial value of the page rank (1.0), and rename the columns so it is usable by the parent class. Once the job has finished, we post-process the output to produce a list of from email addresses and their associated page rank in descending rank order. Here is the code for these two classes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/MailRank.scala
package com.mycompany.solr4extras.corpus
import com.twitter.scalding.examples.PageRank
import com.twitter.scalding.{Tsv, Job, Args}
import cascading.pipe.joiner.LeftJoin
import cascading.pipe.Pipe
/**
* Converts data generated by MongoEmailPairs of the
* form: (from_id, {to_id|cc_id|bcc_id}) to the format
* required by PageRank, ie (from_id, List(to_ids), pagerank)
*/
class MailRank(args: Args) extends PageRank(args) {
override def initialize(nodeCol: Symbol, neighCol: Symbol,
pageRank: Symbol): Pipe = {
val input = Tsv(args("input"), ('from, 'to))
input.read.
groupBy('from) { _.toList[String]('to -> 'tos) }.
map('tos -> 'tosf) { tos: List[String] =>
tos.foldLeft("")(_ + "," + _).substring(1) }.
map('from -> ('from, 'prob)) { from: String =>
(from, 1.0)
}.project('from, 'tosf, 'prob).
mapTo((0, 1, 2) -> (nodeCol, neighCol, pageRank)) {
input : (Long, String, Double) => input
}
}
}
/**
* Converts the format returned by PageRank, ie:
* (from_id, List(to_id), final_pagerank) to
* (from_email, final_pagerank) sorted by pagerank
* descending.
*/
class MailRankPostProcessor(args: Args) extends Job(args) {
val input = Tsv(args("input"), ('from, 'tos, 'rank))
val output = Tsv(args("output"))
val reference = Tsv(args("reference"), ('ref_email, 'ref_from)).read
input.read.
project('from, 'rank).
joinWithSmaller('from -> 'ref_from, reference, joiner = new LeftJoin).
project('ref_email, 'rank).
groupAll { _.sortBy('rank).reverse }.
write(output)
}
|
Here is the list of the top 15 email addresses which had the highest page rank. If you followed the Enron trial, then you may recognize a few names here:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | vince.kaminski@enron.com 129.6456344199462
sara.shackleton@enron.com 123.65630393847148
louise.kitchen@enron.com 117.4938083777274
jeff.dasovich@enron.com 116.83443565190146
tana.jones@enron.com 113.12379367608482
mark.taylor@enron.com 109.67593395984632
sally.beck@enron.com 108.4905670421611
ebass@enron.com 103.54919812912469
jeff.skilling@enron.com 100.57545577565519
steven.kean@enron.com 99.69270847011283
john.lavorato@enron.com 90.16447940199485
gerald.nemec@enron.com 88.4001157213643
kenneth.lay@enron.com 88.1467699737448
richard.shapiro@enron.com 82.10524578705625
kay.mann@enron.com 69.19222780384432
|
Document Clustering
Finally, I decided to cluster the documents in order to find if there were multiple topics that were being discussed in the corpus. The input to this process is the (term, docID, freq) triples that we generated from Lucene, and the output is a set of (docID, {term frequency vector}) tuples. The sets up partial vectors and aggregates them together to form document vectors using Mahout's SequentialAccessSparseVector class. I found post on Software Anatomy very useful when writing this code - much of the DocVector class below is based on the code shown there.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/DocVector.scala
package com.mycompany.solr4extras.corpus
import java.io.{PrintWriter, FileWriter, File}
import scala.collection.mutable.ListBuffer
import scala.io.Source
import org.apache.mahout.math.{VectorWritable, SequentialAccessSparseVector}
import com.twitter.scalding.{Tsv, TextLine, Job, Args}
import cascading.pipe.joiner.LeftJoin
/**
* Reads input of the form (term freq) of most frequent terms,
* and builds a dictionary file. Using this file, creates a
* collection of docID to sparse doc vector mappings of the
* form (docID, {termID:freq,...}).
*/
class DocVector(args: Args) extends Job(args) {
val input = Tsv(args("input"), ('term, 'docID, 'freq))
val termcounts = TextLine(args("termcounts"))
val dictOutput = Tsv(args("dictionary"))
val output = Tsv(args("docvector"))
// (term freq) => (term num)
val dictionary = termcounts.read.
project('num, 'line).
map('line -> 'word) { line: String => line.split('\t')(0) }.
project('word, 'num)
// input: (term, docID, freq)
// join with dictionary ond write document as (docId, docvector)
input.read.
joinWithSmaller('term -> 'word, dictionary, joiner = new LeftJoin).
filter('word) { word: String => (!(word == null || word.isEmpty)) }.
project('docID, 'num, 'freq).
map(('docID, 'num, 'freq) -> ('docId, 'pvec)) {
doc: (String, Int, Int) =>
val pvec = new SequentialAccessSparseVector(
args("vocabsize").toInt)
pvec.set(doc._2, doc._3)
(doc._1, new VectorWritable(pvec))
}.
groupBy('docId) {
group => group.reduce('pvec -> 'vec) {
(left: VectorWritable, right: VectorWritable) =>
new VectorWritable(left.get.plus(right.get).normalize)
}}.
write(output)
// save the dictionary as (term, idx)
dictionary.write(dictOutput)
}
/**
* Converts the Document Vector file to an ARFF file for
* consumption by Weka.
*/
class DocVectorToArff {
def generate(input: String, output: String,
numDimensions: Int): Unit = {
val writer = new PrintWriter(new FileWriter(new File(output)), true)
// header
writer.println("@relation docvector\n")
(1 to numDimensions).map(n =>
writer.println("@attribute vec" + n + " numeric"))
writer.println("\n@data\n")
// body
Source.fromFile(new File(input)).getLines.foreach(line => {
writer.println(line.split('\t')(1).replaceAll(":", " "))
})
writer.flush
writer.close
}
}
/**
* Reads output from Weka Explorer SimpleKMeans run (slightly
* modified to remove header information) to produce a list
* of top N words from each cluster.
*/
class WekaClusterDumper {
def dump(input: String, dictionary: String,
output: String, topN: Int): Unit = {
// build up map of terms from dictionary
val dict = Source.fromFile(new File(dictionary)).getLines.
map(line => {
val cols = line.split("\t")
cols(1).toInt -> cols(0)
}).toMap
// build up elements list from weka output
var clusterScores = new Array[ListBuffer[(Int,Double)]](5)
Source.fromFile(new File(input)).getLines.
foreach(line => {
val cols = line.split("\\s+")
val idx = cols(0).toInt - 1
val scores = cols.slice(2, 7)
(0 to 4).foreach(i =>
if (scores(i).toDouble > 0.0D) {
if (clusterScores(i) == null)
clusterScores(i) = new ListBuffer[(Int,Double)]
clusterScores(i) += Tuple(idx, scores(i).toDouble)
})
})
// sort each clusterScore by score descending and get the
// corresponding words from the dictionary by idx
val writer = new PrintWriter(new FileWriter(new File(output)), true)
var i = 0
clusterScores.foreach(clusterScore => {
writer.println("Cluster #" + i)
clusterScore.toList.sort(_._2 > _._2).
slice(0, topN).map(tuple => {
val word = dict(tuple._1)
writer.println(" " + word + " (" + tuple._2 + ")")
})
i = i + 1
})
writer.flush
writer.close
}
}
|
As I mentioned above, the output of DocVector was supposed to be passed into Mahout's KMeans driver (and Canopy driver for the initial centroids) but I could not generate Sequence files which Mahout expects, so I converted the DocVector output to an ARFF file so I could pass it into Weka.
I then ran SimpleKMeans requesting 5 clusters with 10 iterations, and the first attempt resulted in an OutOfMemoryException. Since I have a pretty bad-ass MacBook Pro with 8GB RAM (okay, it was bad-ass when I bought it 3 years ago), I upped the default -Xmx for Weka from 256MB to 2GB (this is in the Info.plist file under the /Applications/weka directory for those of you who use Macs), and life was good again.
In any case, after a while, Weka dumped out the results of the computation to the Explorer console, from which I copy-pasted it and passed it through my WekaClusterDumper class. This class parses the Weka clustering output (minus the headers which I removed manually) to print the top N words in each cluster. Since Weka does not know the actual terms being clustered (only their position in the dictionary file generated by DocVector), the Cluster dumper uses this file to look up the actual terms in each cluster. Here is the output of the dump.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | Cluster #0
ect (13.3978)
image (4.639)
enron.com (3.7695)
Cluster #1
enron (20.7614)
Cluster #2
http (0.3507)
enron.com (0.309)
enron_development (0.2871)
mail (0.1964)
ees (0.1615)
image (0.1558)
hou (0.1097)
message (0.1039)
ect (0.0909)
energy (0.0892)
Cluster #3
mail (56.5979)
Cluster #4
study (12.2941)
|
As you can see, according the clustering, there does not seem to be too much variety in the discussion going on in the Enron dataset.
Finally, I used a single object to call the classes described above from sbt (using "sbt run"). The whole process was very interactive, so don't run the code as is, it will fail. I ran each step individually, some multiple times, and commented out the previous blocks as I went forward.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | // Source: src/main/scala/com/mycompany/solr4extras/corpus/CorpusAnalyzer.scala
package com.mycompany.solr4extras.corpus
import com.twitter.scalding.Args
object CorpusAnalyzer extends App {
//////////// document clustering ///////////
(new Lucene4TermFreq("/Users/sujit/Downloads/apache-solr-4.0.0/example/solr/collection1/data/index")).
generate("data/input/corpus_freqs.txt", 50, 1000, 10)
(new FreqDist(Args(List(
"--local", "",
"--input", "data/input/corpus_freqs.txt",
"--output", "data/output/freq_dist.txt",
"--stopwords", "/Users/sujit/Downloads/apache-solr-4.0.0/example/solr/collection1/conf/stopwords.txt")))).
run
//////// mail rank ////////
(new MongoEmailPairs("localhost", 27017, "solr4secure",
"/Users/sujit/Downloads/apache-solr-4.0.0/example/solr/collection1/data/index")).
generate("data/input/email_refs.txt",
"data/input/email_pairs.txt")
(new MailRank(Args(List(
"--local", "",
"--input", "data/input/email_pairs.txt",
"--output", "data/output/mailrank.txt",
"--iterations", "10")))).
run
(new MailRankPostProcessor(Args(List(
"--local", "",
"--input", "data/output/mailrank.txt",
"--reference", "data/input/email_refs.txt",
"--output", "data/output/mailrank_final.txt")))).run
////////////// clustering terms ///////////////
(new DocVector(Args(List(
"--local", "",
"--input", "data/input/corpus_freqs.txt",
"--termcounts", "data/input/freq_words.txt",
"--vocabsize", "1326", // cat freq_words | cut -f1 | sort | uniq | wc
"--dictionary", "data/output/dictionary.txt",
"--docvector", "data/output/docvector.txt")))).
run
(new DocVectorToArff()).generate(
"/Users/sujit/Projects/solr4-extras/data/output/docvector.txt",
"data/output/docvector.arff", 1326)
(new WekaClusterDumper()).dump(
"/Users/sujit/Projects/solr4-extras/data/output/weka_cluster_output.txt",
"/Users/sujit/Projects/solr4-extras/data/output/dictionary.txt",
"data/output/cluster.dump",10)
}
|
Well, this is all I have for today. The source code for this stuff is also available on my GitHub project page if you want to play around with it on your own. Hope you enjoyed it. If I don't get a chance to post again before next year (unlikely given that there are just 3 more days left and my mean time between posts is now about 14 days), be safe and have a very Happy New Year. Heres hoping for a lot of fun in 2013 (the International Year of Statistics).