Saturday, October 24, 2009

Finding Phrases with Binomial Distribution

Last week, I wrote about the Binomial Distribution, and worked through the "New York" example from Dr Manu Konchady's "Building Search Applications" book. This week, I enhance my old Phrase Extractor code to filter the word grams generated using the ratio of actual probability to the estimated probability (according to the Binomial Distribution) as a guide.

Overview of Approach

Since I am just exploring the approach, I limit myself to 1 and 2 word grams. However, it can easily be extended to 3, 4 and 5 word grams, by considering the phrases found in the previous stage. Our input is a preprocessed sequence file of sentences, one per line, which is generated from combining "Alice in Wonderland", "Moby Dick" and "The adventures of Sherlock Holmes" ebooks from the Gutenberg project. The processing is done using a sequence of two map-reduce jobs. The first job writes out the word 1-grams and 2-grams and their occurrences into a database table. The second job reads through the 2-grams, and looks up occurrence values of corresponding 1-grams to compute the estimated and actual probabilities and do the filtering.

Using DBInput/Output

One thing new to me is the use of a relational database as a Hadoop input and output format. Since I am using the new Hadoop API introduced in Hadoop 0.20, and because the corresponding DBInput/OutputFormats were not available in this version, I had to create local copies of these artifacts from the (as yet unreleased) Hadoop 0.22 distribution. I have checked these temporarily into JTMT's svn repository so everything compiles.

I needed to use a database because I wanted to be able to pull individual occurrences out for a given phrase. I suppose I could have done this with HBase, but I know next to nothing about it, and using an RDBMS with Hadoop seemed to be a good thing to know about. So in any case, I am using a local MySQL instance as my database.

Apart from copying over all the DB* classes from org.apache.hadoop.mapreduce.lib.db, I had to create a custom DbWritable subclass representing a record in the MySQL table. It is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseCounterDbWritable.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import net.sf.jtmt.concurrent.hadoop.phraseextractor.db.DBWritable;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.hadoop.io.Writable;

/**
 * A writable that can be used to read and write data from the table
 * tmdb:my_phrase_counts:
 * +-----------+--------------+------+-----+---------+-------+
 * | Field     | Type         | Null | Key | Default | Extra |
 * +-----------+--------------+------+-----+---------+-------+
 * | phrase    | varchar(255) | NO   | MUL | NULL    |       | 
 * | gram_size | int(11)      | NO   | MUL | NULL    |       | 
 * | occurs    | mediumtext   | NO   |     | NULL    |       | 
 * +-----------+--------------+------+-----+---------+-------+
 */
public class PhraseCounterDbWritable implements Writable, DBWritable {

  private String phrase;
  private int gramSize;
  private long occurs;
  
  public String getPhrase() {
    return phrase;
  }

  public void setPhrase(String phrase) {
    this.phrase = phrase;
  }

  public int getGramSize() {
    return gramSize;
  }

  public void setGramSize(int gramSize) {
    this.gramSize = gramSize;
  }

  public long getOccurs() {
    return occurs;
  }

  public void setOccurs(long occurs) {
    this.occurs = occurs;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    phrase = in.readUTF();
    gramSize = in.readInt();
    occurs = in.readLong();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeUTF(phrase);
    out.writeInt(gramSize);
    out.writeLong(occurs);
  }

  @Override
  public void readFields(ResultSet resultSet) throws SQLException {
    phrase = resultSet.getString(1);
    gramSize = resultSet.getInt(2);
    occurs = resultSet.getLong(3);
  }

  @Override
  public void write(PreparedStatement statement) throws SQLException {
    statement.setString(1, phrase);
    statement.setInt(2, gramSize);
    statement.setLong(3, occurs);
  }
  
  @Override
  public String toString() {
    return ReflectionToStringBuilder.reflectionToString(
      this, ToStringStyle.DEFAULT_STYLE);
  }
}

The code

Here is the main driver code. It is a simple pipeline of two map-reduce jobs, with some straightforward JDBC code thrown in. The first step is to drop and create the my_phrases_count table. The second step reads the sequence file containing the sentences and writes out the 1 and 2 word grams to the table. The third step puts indexes on the table - we do this for performance - inserts are faster without indexes, and we definitely need indexes to lookup by phrase and gram size in the next step. The fourth step filters phrases based on the estimated and actual probabilities and writes them out into a TextOutput.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseExtractor.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;

// TODO: remove these when moving to Hadoop 0.22+
import net.sf.jtmt.concurrent.hadoop.phraseextractor.db.DBConfiguration;
import net.sf.jtmt.concurrent.hadoop.phraseextractor.db.DBInputFormat;
import net.sf.jtmt.concurrent.hadoop.phraseextractor.db.DBOutputFormat;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Breaks up input text into sentences, then generates 3-5 grams of
 * the input text.
 */
public class PhraseExtractor {

  public static final int MIN_GRAM_SIZE = 1;
  public static final int MAX_GRAM_SIZE = 3;
  public static final int MIN_OCCURRENCES_THRESHOLD = 5;
  public static final int MIN_LOG_LIKELIHOOD_RATIO = 100;
  
  public static final String MIN_GRAM_SIZE_KEY =
    "phrase_extractor.min.gram.size";
  public static final String MAX_GRAM_SIZE_KEY =
    "phrase_extractor.max.gram.size";
  public static final String MIN_OCCURRENCES_KEY =
    "phrase_extractor.min.occurs";
  public static final String MIN_LOG_LIKELIHOOD_RATIO_KEY =
    "phrase_extractor.min.llr";
  public static final String NUM_WORDS_IN_CORPUS_KEY =
    "phrase_extractor.num.words";
  
  private static void recreateTable(Configuration conf) throws Exception {
    Connection conn = null;
    PreparedStatement ps = null;
    try {
      DBConfiguration dbconf = new DBConfiguration(conf);
      conn = dbconf.getConnection();
      ps = conn.prepareStatement("drop table my_phrase_counts");
      ps.executeUpdate();
      ps = conn.prepareStatement("create table my_phrase_counts (" +
        "phrase varchar(255) not null, " +
        "gram_size int not null," +
        "occurs long not null" +
        ") type=ISAM");
      ps.executeUpdate();
    } catch (SQLException e) {
      throw new Exception(e);
    } finally {
      if (ps != null) {
        try { ps.close(); } catch (SQLException e) {}
        try { conn.close(); } catch (SQLException e) {}
      }
    }
  }

  private static void writeNGrams(Configuration conf, Path inputPath) 
      throws Exception {
    DBConfiguration dbconf = new DBConfiguration(conf);
    dbconf.setOutputTableName("my_phrase_counts");
    dbconf.setOutputFieldNames("phrase", "gram_size", "occurs");
    Job job = new Job(conf, "write-ngrams");
    job.getConfiguration().setInt(MIN_GRAM_SIZE_KEY, MIN_GRAM_SIZE);
    job.getConfiguration().setInt(MAX_GRAM_SIZE_KEY, MAX_GRAM_SIZE);
    job.getConfiguration().setInt(MIN_OCCURRENCES_KEY, 
      MIN_OCCURRENCES_THRESHOLD);
    job.setJarByClass(PhraseExtractor.class);
    FileInputFormat.addInputPath(job, inputPath);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapperClass(WordNGramMapper.class);
    job.setReducerClass(WordNGramReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(PhraseCounterDbWritable.class);
    job.setOutputFormatClass(DBOutputFormat.class);
    job.setNumReduceTasks(2);
    boolean status = job.waitForCompletion(true);
    if (! status) {
      throw new Exception("Job " + job.getJobName() + " failed!");
    }
  }

  private static void createIndex(Configuration conf) throws Exception {
    Connection conn = null;
    PreparedStatement ps = null;
    try {
      DBConfiguration dbconf = new DBConfiguration(conf);
      conn = dbconf.getConnection();
      ps = conn.prepareStatement("create index my_phrase_counts_ix1 " +
        "on my_phrase_counts(gram_size)");
      ps.executeUpdate();
      ps = conn.prepareStatement("create index my_phrase_counts_ix2 " +
        "on my_phrase_counts(phrase)");
      ps.executeUpdate();
    } catch (SQLException e) {
      throw(e);
    } finally {
      if (ps != null) {
        try { ps.close(); } catch (SQLException e) {}
        try { conn.close(); } catch (SQLException e) {}
      }
    }
  }

  private static long getNumWordsInCorpus(DBConfiguration dbconf) 
      throws IOException {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
      conn = dbconf.getConnection();
      ps = conn.prepareStatement("select sum(occurs) " +
        "from my_phrase_counts where gram_size=1");
      rs = ps.executeQuery();
      rs.next();
      return rs.getLong(1);
    } catch (SQLException e) {
      throw new IOException(e);
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    } finally {
      if (rs != null) {
        try { rs.close(); } catch (SQLException e) {}
        try { ps.close(); } catch (SQLException e) {}
        try { conn.close(); } catch (SQLException e) {}
      }
    }
  }

  private static void findLikelyPhrases(Configuration conf, int gramSize,
      Path outputPath) throws Exception {
    DBConfiguration dbconf = new DBConfiguration(conf);
    dbconf.setInputTableName("my_phrase_counts");
    dbconf.setInputFieldNames("phrase", "gram_size", "occurs");
    dbconf.setInputConditions("gram_size=" + gramSize);
    dbconf.setInputClass(PhraseCounterDbWritable.class);
    Job job = new Job(conf, "find-likely-phrases");
    job.getConfiguration().setLong(NUM_WORDS_IN_CORPUS_KEY, 
      getNumWordsInCorpus(dbconf));
    job.getConfiguration().setInt(MIN_LOG_LIKELIHOOD_RATIO_KEY, 
      MIN_LOG_LIKELIHOOD_RATIO);
    job.setJarByClass(PhraseExtractor.class);
    job.setInputFormatClass(DBInputFormat.class);
    job.setMapperClass(LikelyPhraseMapper.class);
    job.setReducerClass(LikelyPhraseReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(DoubleWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, outputPath);
    job.setNumReduceTasks(2);
    boolean status = job.waitForCompletion(true);
    if (! status) {
      throw new Exception("Job " + job.getJobName() + " failed!");
    }
  }

  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = 
      new GenericOptionsParser(conf, argv).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: calc input_path output_path");
      System.exit(-1);
    }
    // set up database properties
    DBConfiguration.configureDB(conf, 
      "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/tmdb", 
      "root", "5ec4e1");
    recreateTable(conf);
    writeNGrams(conf, new Path(otherArgs[0]));
    createIndex(conf);
    findLikelyPhrases(conf, 2, new Path(otherArgs[1]));
  }
}

The WordNGramMapper is not very different from the previous version. All it does is find the N-grams given a sentence and writes them out into the context with a count of 1. Details about the WordNGramGenerator itself can be found in my previous post.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/WordNGramMapper.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Reads a sequence file and converts it to {ngram, count}.
 */
public class WordNGramMapper 
    extends Mapper<LongWritable,Text,Text,LongWritable> {

  private static final LongWritable ONE = new LongWritable(1L);

  private static int minGramSize;
  private static int maxGramSize;

  @Override
  public void setup(Context context) {
    minGramSize = context.getConfiguration().getInt(
      PhraseExtractor.MIN_GRAM_SIZE_KEY, PhraseExtractor.MIN_GRAM_SIZE);
    maxGramSize = context.getConfiguration().getInt(
      PhraseExtractor.MAX_GRAM_SIZE_KEY, PhraseExtractor.MAX_GRAM_SIZE);
  }

  @Override
  public void map(LongWritable key, Text value, Context context) 
  throws IOException, InterruptedException {
    String sentence = value.toString();
    WordNGramGenerator ngramGenerator = new WordNGramGenerator();
    List<String> grams = ngramGenerator.generate(
      sentence, minGramSize, maxGramSize);
    for (String gram : grams) {
      context.write(new Text(gram), ONE);
    }
  }
}

The WordNGramReducer aggregates the n-gram counts and writes it out to the database table. DBInputFormat requires the data to be populated into the key, so thats what I have done here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/WordNGramReducer.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Writes out n-gram counts, filtering out the n-grams whose occurrence
 * counts falls below a predefined threshold.
 */
public class WordNGramReducer extends  
    Reducer<Text,LongWritable,PhraseCounterDbWritable,NullWritable> {

  private static int minOccurrences;

  @Override
  public void setup(Context context) {
    minOccurrences = 
      context.getConfiguration().getInt(PhraseExtractor.MIN_OCCURRENCES_KEY, 
      PhraseExtractor.MIN_OCCURRENCES_THRESHOLD);
  }

  @Override
  public void reduce(Text key, Iterable<LongWritable> values, 
    Context context) throws IOException, InterruptedException {
    long sum = 0L;
    for (Iterator<LongWritable> it = values.iterator(); it.hasNext();) {
      it.next();
      sum++;
    }
    if (sum > minOccurrences) {
      int gramSize = StringUtils.split(key.toString(), " ").length;
      PhraseCounterDbWritable okey = new PhraseCounterDbWritable();
      okey.setPhrase(key.toString());
      okey.setGramSize(gramSize);
      okey.setOccurs(sum);
      context.write(okey, NullWritable.get());
    }
  }
}

The LikelyPhraseMapper reads in bigrams from the database table, then for each bigram phrase, looks up the occurrence value of the corresponding trailing word, computes the actual and estimated probabilities for the phrase, and for all qualifying phrases writes the information into the context.

There is a bug in the WordNGramGenerator where some of the trailing words are being considered in the bigram but not by itself. I discovered this when trying to run the mapper - the Binomial Coefficient nCr was hitting a case where r > n, which is like saying that in a corpus "New York" occurred more times than "York", which obviously cannot happen. I haven't had time to fix this bug, but I will (its marked by a TODO in the code below).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/LikelyPhraseMapper.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import net.sf.jtmt.concurrent.hadoop.phraseextractor.db.DBConfiguration;

import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Reads in bigrams from the database and computes the likelihood of the
 * phrase using Binomial distribution. Writes out phrases which satisfy
 * the test.
 */
public class LikelyPhraseMapper extends
    Mapper<LongWritable,PhraseCounterDbWritable,Text,DoubleWritable> {

  private static DBConfiguration dbconf;
  private static long nWords;
  private static double logLikelihoodThreshold;

  @Override
  public void setup(Context context) 
      throws IOException, InterruptedException {
    dbconf = new DBConfiguration(context.getConfiguration());
    nWords = context.getConfiguration().getLong(
      PhraseExtractor.NUM_WORDS_IN_CORPUS_KEY, -1L);
    logLikelihoodThreshold = context.getConfiguration().getInt(
      PhraseExtractor.MIN_LOG_LIKELIHOOD_RATIO_KEY, 
      PhraseExtractor.MIN_LOG_LIKELIHOOD_RATIO);
  }

  @Override
  public void map(LongWritable key, PhraseCounterDbWritable value, 
    Context context) throws IOException, InterruptedException {
    String phrase = value.getPhrase();
    long phraseOccurs = value.getOccurs();
    String trailingWord = phrase.substring(phrase.lastIndexOf(' ') + 1);
    long trailingWordOccurs = getOccurrence(trailingWord);
    double pTrailingWord = (double) trailingWordOccurs / (double) nWords;
    if (phraseOccurs > trailingWordOccurs) {
      // TODO: fix this bug, this is impossible, and points to a bug in
      // the NGram generator code.
    } else {
      double estPhraseLogProbability = getEstimatedLogProbability(
        trailingWordOccurs, pTrailingWord, phraseOccurs);
      double actPhraseLogProbability = 
        Math.log(phraseOccurs) - Math.log(nWords);
      // if the actual occurrence probability is 100 times as likely as the 
      // estimated probability given by the binomial distribution, then we 
      // consider this to be a likely phrase.
      double diff = actPhraseLogProbability - estPhraseLogProbability;
      if (diff > logLikelihoodThreshold) {
        context.write(new Text(phrase), new DoubleWritable(diff));
      }
    }
  }

  private long getOccurrence(String trailingWord) throws IOException {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
      conn = dbconf.getConnection();
      ps = conn.prepareStatement("select occurs " +
      "from my_phrase_counts where phrase = ?");
      ps.setString(1, trailingWord);
      rs = ps.executeQuery();
      rs.next();
      return rs.getLong(1);
    } catch (SQLException e) {
      throw new IOException(e);
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    } finally {
      if (rs != null) {
        try { rs.close(); } catch (SQLException e) {}
        try { ps.close(); } catch (SQLException e) {}
        try { conn.close(); } catch (SQLException e) {}
      }
    }
  }

  private double getEstimatedLogProbability(long n, double p, long x) {
    return MathUtils.binomialCoefficientLog((int) n, (int) x) + 
    (x * Math.log(p)) + ((n - x) * Math.log(1 - p));
  }
}

The LikelyPhraseReducer is just a simple identity reducer. The mapper will write out zero or one row for each word bigram, and the reducer just writes the data out into a TextOutput.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/LikelyPhraseReducer.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Simple identity reducer, writes out qualifying phrase and its log
 * likelihood ratio to a text output.
 */
public class LikelyPhraseReducer
    extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {

  @Override
  public void reduce(Text key, Iterable<DoubleWritable> values,
      Context context) throws IOException, InterruptedException {
    for (DoubleWritable value : values) {
      // will only have 1 value in the iterable
      context.write(key, value);
    }
  }
}

Results

The results are definitely better than with the previous implementation, but still contain a lot of junk. Here are some nice phrases it caught (one each from each book above).

1
2
3
4
sherlock holmes 348.64636504741196
sperm whale     327.2002806998291
your majesty    213.70131001002
...

Out of 5447 two word grams that were considered, 1358 were considered likely to be phrases with a log likelihood ratio threshold of 100 (ie the actual probability is 100 times the estimated probability). Perhaps I should increase it - as you can see, the "nice" results I see above all have a ratio of 200 or more.

Another thing I noticed is that a lot of the "likely" phrases have stopwords in them - perhaps I should filter out phrases that are considered likely but have stopwords in them. I will post updates as I find out more.

Update 2009-10-28: Putting in a simple filter to remove phrases which contain stopwords (based on my stopword.txt file) resulted in 27 very nice phrases, which are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
aye aye           141.51576072034712
baker street      129.34515160313717
captain ahab      168.36076624969655
captain peleg     165.97023802793268
good deal         138.35536009692996
hosmer angel      116.01526169240735
lord st           150.75121049953745
march hare        246.8881927527065
miss hunter       118.52058386372536
miss stoner       110.05045443880385
moby dick         563.8792649078067
mock turtle       381.22084668006727
mr holmes         253.46200914806514
mr starbuck       105.5923146454597
mr windibank      105.90936635039294
project gutenberg 326.82789982895486
sherlock holmes   350.16072531114384
small print       139.89748896049025
sperm whale       329.133237299884
sperm whale's     140.82282422992682
st clair          171.47400592362465
st simon          298.5832451161461
thou art          130.54357132297693
white rabbit      144.84973106411437
white whale       198.02535045342663
years ago         190.3057543422783
young lady        124.60954738818677

On reading a bit more about the Binomial Distribution, I learned that in certain cases, one can consider it to approximate a Gaussian Distribution, and therefore use a cutoff that reflects the confidence level (ie as a multiple of its z-score). So I changed the code in LikelyPhraseMapper to compute the mean and standard deviation of the Binomial Distribution for each phrase, and then its z-score, and then the distance of the observed probability from the mean as a multiple of the z-score. If the distance exceeds 0.1 of z, we consider the independence hypothesis (ie the two words in the phrase are independent) refuted with 10% confidence.

With this approach, I get a larger number of phrases, and these phrases look quite good too. Most of the original 27 phrases are included in this larger set. Right now, the code is in a pretty hacked up state, so I need to do a bit of cleanup before I can post to the svn repository, but just to put in code what I explained in the previous paragraph, here is the snippet that you will need to stick into the LikelyPhraseMapper.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  @Override
  public void map(LongWritable key, PhraseCounterDbWritable value, Context context) 
      throws IOException, InterruptedException {
    ...
    String phrase = value.getPhrase();
    String[] wordsInPhrase = StringUtils.split(phrase, " ");
    long phraseOccurs = value.getOccurs();
    String trailingWord = phrase.substring(phrase.lastIndexOf(' ') + 1);
    long trailingWordOccurs = getOccurrence(trailingWord);
    double pTrailingWord = (double) trailingWordOccurs / (double) nWords;
    BinomialDistribution dist = 
      new BinomialDistributionImpl((int) trailingWordOccurs, pTrailingWord);
    double estPhraseProbability = dist.probability(phraseOccurs);
    double distMean = trailingWordOccurs * pTrailingWord;
    double distStdDev = Math.sqrt(distMean * (1 - pTrailingWord));
    double actPhraseProbability = trailingWordOccurs / nWords;
    double zScore = (estPhraseProbability - distMean) / distStdDev;
    double distance = Math.abs((actPhraseProbability - distMean) / zScore);
    if (distance >= 0.1257D) { // 10% confidence level
      // if the condition holds, we can refute the independence hypothesis
      // with 10% confidence.
      context.write(new Text(phrase), new DoubleWritable(distance));
    }
  }

I plan to work some more on some of the other phrase extraction ideas outlined in Dr Konchady's book, so I will do the necessary refactoring so that either algorithm is pluggable.

Saturday, October 17, 2009

Binomial Probability Distribution with Commons-Math

Sometime back, I wrote about a Hadoop Phrase Extractor that went through a bunch of electronic books from the Gutenberg project and extracted possible phrases by parsing out 3-5 word grams. The extractor works fairly well for phrases of 3 words or more with a simple count threshold. However, for 2 word phrases, a count threshold (i.e. only consider word bigrams which occur more than a specific cutoff value) did not work so well for me.

In his book, Building Search Applications - Lucene, LingPipe and Gate, Dr Manu Konchady touches on using Binomial Distributions to compute the likelihood of a word bigram being a phrase based on its number of occurrences and the number of occurrences of its component words in a document corpus. I decided to explore that a bit further, and I try to explain my understanding of the approach in this post.

Binomial Distribution - theory

Although I learned about Binomial Distributions in high school, I never really understood it beyond being able to solve simple toy problems, and in any case, its been a while, so I decided to brush up on the theory. The AP* Statistics Tutorial: Binomial Distribution page from StatTrek explains it in great detail, so you may also find this page helpful if you are in a similar situation.

Essentially, for a word bigram, we are trying to estimate the likelihood of observing the number of bigram occurrences of the second word, preceded by the first word, out of the number of occurrences of the second word alone, with the observed probability of the second word in the corpus. This definition reduces our phrase detection problem into a Binomial Distribution problem.

Binomial Distribution Implementation

The commons-math library does not have a implementation for this distribution (this is not true, as I found out later, please see the update below for details), but it provides a Distribution Framework and guidelines for creating your own custom distributions, which I used to create my own subclass. The code is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/BinomialDistribution.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import org.apache.commons.math.MathException;
import org.apache.commons.math.distribution.AbstractIntegerDistribution;
import org.apache.commons.math.distribution.DiscreteDistribution;
import org.apache.commons.math.util.MathUtils;

/**
 * Provides various probability density functions for a Binomial
 * Distribution. Definitions and formulae for Binomial Distribution
 * can be found here:
 * {@link http://stattrek.com/Lesson2/Binomial.aspx?Tutorial=AP}
 */
public class BinomialDistribution extends AbstractIntegerDistribution 
    implements DiscreteDistribution {

  private static final long serialVersionUID = -1858690105951636184L;

  private int n;    // number of trials
  private double p; // probability of success on an individual trial
  
  /**
   * Construct a Binomial Distribution experiment instance.
   * @param n the number of trials.
   * @param p the probability of success on an individual trial.
   */
  public BinomialDistribution(int n, double p) {
    super();
    this.n = n;
    this.p = p;
  }

  /**
   * Using logs is useful for very large values of n.
   * @param x the number of successes.
   * @return the log of the probability.
   */  
  public double logProbability(int x) {
    return MathUtils.binomialCoefficientLog(n, x) + 
      (x * Math.log(p)) + ((n - x) * Math.log(1 - p));
  }
  
  /**
   * Computes the probability that the experiment results in 
   * exactly x successes. The computation is done with logarithms
   * internally and converted back to the probability value to
   * prevent overflow.
   * @param x the number of successes, should be an integer.
   * @return probability the probability of exactly x successes.
   */
  @Override
  public double probability(int x) {
    return Math.exp(MathUtils.binomialCoefficientLog(n, x) + 
      (x * Math.log(p)) + ((n - x) * Math.log(1 - p)));
  }

  /**
   * Computes the probability that the experiment results in at 
   * least x (0-x) successes.
   * @param x the number of successes, should be an integer.
   * @return the probability of at least x successes.
   */
  @Override
  public double cumulativeProbability(int x) throws MathException {
    double cumulativeProbability = 0.0D;
    for (int i = 0; i <= x; i++) {
      cumulativeProbability += probability(i);
    }
    return cumulativeProbability;
  }

  @Override
  protected int getDomainLowerBound(double p) {
    return 0;
  }

  @Override
  protected int getDomainUpperBound(double p) {
    return 1;
  }
}

To test the component out, I used the problems and their solutions described on the StatTrek page. Here is my JUnit test. The class is not that hard to use, but this also shows the usage of the class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Source: src/test/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/BinomialDistributionTest.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.math.RoundingMode;

import org.apache.commons.math.util.MathUtils;
import org.junit.Assert;
import org.junit.Test;

/**
 * Tests for Binomial Distribution. The tests solves some problems
 * found in the StatTrek AP Statistics page and compares the results
 * with that provided in the page. Link for the StatTrek page is:
 * {@link http://stattrek.com/Lesson2/Binomial.aspx?Tutorial=AP}
 */
public class BinomialDistributionTest {

  /**
   * Problem: Suppose a die is tossed 5 times. What is the probability
   * of getting exactly 2 fours?
   * @throws Exception if thrown.
   */
  @Test
  public void test1() throws Exception {
    double p = 1.0D / 6.0D;
    BinomialDistribution bd = new BinomialDistribution(5, p);
    double probability = bd.probability(2);
    System.out.println(">> b(x = 2; 5, " +  p + ") = " + probability);
    Assert.assertEquals(0.161, MathUtils.round(probability, 3, 
      RoundingMode.HALF_UP.ordinal()));
  }

  /**
   * Problem: What is the probability of obtaining 45 or fewer heads in
   * 100 tosses of a coin.
   * @throws Exception if thrown.
   */
  @Test
  public void test2() throws Exception {
    BinomialDistribution bd = new BinomialDistribution(100, 0.5D);
    double cumulativeProbability = bd.cumulativeProbability(45);
    System.out.println(">> b(x <= 45; 100, 0.5) = " +
      cumulativeProbability);
    Assert.assertEquals(0.184, MathUtils.round(cumulativeProbability, 3, 
      RoundingMode.HALF_UP.ordinal()));
  }
  
  /**
   * Problem: The probability that a student is accepted into a prestigeous
   * college is 0.3. If 5 students from the same school apply, what is the
   * probability that at most 2 are accepted?
   * @throws Exception
   */
  @Test
  public void test3() throws Exception {
    BinomialDistribution bd = new BinomialDistribution(5, 0.3D);
    double cumulativeProbability = bd.cumulativeProbability(2);
    System.out.println(">> b(x <= 2; 5, 0.3) = " +
      cumulativeProbability);
    Assert.assertEquals(0.8369, MathUtils.round(cumulativeProbability, 4,
      RoundingMode.HALF_UP.ordinal()));
  }
}

Application in Phrase Identification

I want to use this in the context of a Hadoop job that will extract the word unigrams and bigrams and store their counts. For that, I wanted to first write some code that will, given some parameters I know I can get from a previous stage of my job, compute the likelihood value for the phrase using the problem definition above. Here is a unit test working from the problem definition.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  @Test
  public void test4() throws Exception {
    // known parameters
    int nNew = 1044;      // number of occurrences of first word
    int nYork = 597;      // number of occurrences of second word
    int nNewYork = 588;   // number of bigram occurrences
    int nWords = 1900000; // total words in document corpus
    // observed probability of second word
    double pYork = (double) nYork / (double) nWords;
    BinomialDistribution bd = new BinomialDistribution(nYork, pYork);
    // our probabilities in data mining situations are usually too small
    // so we should use the log probabilities. The logEstimatedProbability
    // is the log likelihood of "new" preceding the word "york" in the 
    // case where "new" and "york" are assumed to have no relation.
    double logEstimatedProbability = bd.logProbability(nNewYork);
    // compare with the log of the observed probability of the bigram
    double logActualProbability = 
      Math.log((double) nNewYork / (double) nWords);
    // determine whether this is a phrase by a simple test
    boolean likely = (logActualProbability - logEstimatedProbability > 0);
    System.out.println("likely? " + likely);
    Assert.assertTrue(likely);
  }

The method outlined in the book is significantly more complex and uses a null independence hypothesis and a corresponding dependence hypothesis. Likelihood ratios are computed for the cases where New precedes York and where New does not precede York for both hypothesis. I traced through the method but could not figure out why the algorithm needs to be so complex, so decided to do it the way I described above. If you find flaws in this approach, please let me know.

Update 2009-10-23: commons-math provides a Binomial Distribution implementation (called BinomialDistributionImpl), and has been at least since version 1.0, not sure how I missed it, but unfortunately I did. The DistributionFactory class referenced here seems to have disappeared from commons-math 2.0 (which I am using), so I guess you can instantiate the implementation with a BinomialDistribution bd = new BinomialDistribution(n, p) call. However, the current implementation does not use the BinomialCoefficient.binomialCoefficientLog() call, which makes it unsuitable for my usage (see the logProbability() method in my implementation). So one possibility is to subclass the BinomialDistributionImpl and add it in there. Another way is to simply create a method that takes the n, p and x parameters and just returns the logProbability, which is probably the path I will be taking.