Thursday, November 26, 2015

SoDA - A Dictionary Based Entity Recognition Tool


Last month I presented a talk at Spark Summit Europe 2015 about a system I have been working on for a while. The system provides a Dictionary based Entity Recognition Microservice based on Solr, SolrTextTagger and OpenNLP. You can find the Abstract, Slides and Video for the talk here. In this post, I describe why I built it and what we are using it for.

Background


My employer, the Reed-Elsevier (RELX) Group, is the world's leading provider of Science and Technology Information. Our charter is to build data and information solutions that help our users (usually STM researchers) achieve better results. Our group at Elsevier Labs is building a Machine Reading Pipeline to distill information from our books and journals into rich domain-specific Knowledge Graphs, that could hopefully be used to make new inferences about the state of our world.

Knowledge graphs (like any other graph) consist of vertices and edges. The vertices represent concepts in the STM universe, and the edges represent the relationships between those concepts. The concepts at the nodes may be generic, such as "surgeon", or may be specific entities such as "Dr. Jane Doe". In order to build knowledge graphs, we need a way to recognize and extract concepts and entities from the text, a process known as entity recognition.

Technology


The easiest way to get started with entity recognition is to use pre-trained statistical Named Entity Recognizers (NERs) available in off-the-shelf Natural Language Processing (NLP) libraries. However, these NERs are trained to recognize a very small and general class of entities such as names of people and places, organizations, etc. While there is value in recognizing these classes, we are typically interested in finding more specific subclasses of these classes (such as universities rather than just any organization) or completely different classes (such as protein names).

Further, STM content is very diverse. While there may be some overlap, entities of interest in one subject (say math) are typically very different from entities of interest in another (say biology). Fortunately, well-curated vocabularies exist for most STM disciplines, which we can leverage in our entity recognition efforts.

Because of this, our approach to NER is dictionary based. Dictionary-based entity matching is a process where snippets of text are matched against a dictionary of terms that represent entities. While this approach may not be as resilient to previously unseen entities as the statistical approach described earlier, it requires no manual tagging, and given enough data, achieves comparable coverage. Dictionary-based matching can also be used to create training data to build custom statistical NERs tailored for different domains, thus achieving the best of both worlds.

Dictionary-based matching techniques are usually based on the Aho-Corasick algorithm, in which the dictionary is held in a compact in-memory data structure against which input text is streamed, matching all dictionary entries simultaneously. The problem with this technique is that it breaks down for large dictionaries, since the corresponding memory requirements also become large. Duplicating the dictionary on all nodes of a Spark cluster could be difficult because of its size.

Solution


Our solution is called the Solr Dictionary Annotator (SoDA). It is a HTTP REST micro-service that allows a client to post a block of text and get back a list of annotations. Annotations are structured objects that contain the entity identifier, the matched text, the beginning and ending character offsets of the matched text in the input text block, and the confidence of the match. Clients can specify how accurate the match should be.

For exact and case-insensitive matching, SoDA piggybacks on a recent development from the Lucene community. Michael McCandless, a Lucene/Solr committer, figured out a way to build finite-state transducers (FST) with Lucene in a very memory-efficient manner, taking advantage of the fact that the index already stores terms in a sorted manner. David Smiley, another Solr committer, realized that FSTs could be used for text tagging, and built the SolrTextTagger plugin for Solr. In keeping with Lucene’s tradition of memory-efficiency and speed, he introduced some more strategies to keep the memory footprint low without significantly impacting the retrieval speed. The original dictionary used a GATE based implementation of the Aho-Corasick algorithm that needed 80GB of RAM to store the dictionary, while SolrTextTagger version consumed only 198MB.

For fuzzy matching, SoDA uses OpenNLP, another open source project, to chunk incoming text into phrases. Depending on the fuzziness of the matching desired, different analysis chains are applied to the incoming phrases, and they are matched against pre-normalized dictionary entries stored in the index. We borrow several ideas from the Python library FuzzyWuzzy from SeatGeek.

SoDA exposes a JSON over HTTP interface, so its language and platform agnostic. You compose a JSON request document containing the text to be linked and the type of matching required, and send it to the REST endpoint URL via HTTP POST (some parameterless services like the status service are accessible over HTTP GET). The server responds with another JSON document containing the entities found in the text and metadata around these entities.

Implementation


At its very core, SoDA is a Spring/Scala based web application that exposes a JSON over HTTP interface on the front end and communicates with a Solr index on the back end. A variety of matching strategies are supported, from exact and case-insensitive matching to completely fuzzy matching. The diagram below shows the components that make up the SoDA application. The client is a Spark Notebook in the Databricks cloud, where the rest of our NLP pipeline is also.


SolrTextTagger is used to serve the exact case-sensitive and case-insensitive entity matches, and OpenNLP is used to chunk incoming text to match against the underlying Solr index for the fuzzy matches. Horizontal scalability (with linear increase in throughput) is achieved by duplicating the component and putting them behind a load balancer.

Conclusion


Our experiments indicate that we can achieve a sustained annotation rate of 30-35 docs/second against a dictionary with 8M+ entries, where each document is about 100MB on average, with SoDA and Solr running on 2 r3.2xlarge machines behind a load balancer. We have been using SoDA for a few months now, and it has already proven itself as a useful component in our pipeline.

My employer has been kind enough to allow me to release SoDA to the open source community. Its available at GitHub here under an Apache 2.0 license. If you are looking for Dictionary based Entity Recognition functionality and you liked what you read so far, I encourage you to download it and give it a try. I look forward to hearing your feedback.

Sunday, October 11, 2015

Hyperparameter Optimization using Monte Carlo Methods


I recently built a classifier using Random Forests. I used the RandomForestClassifier from Scikit-Learn for this. The final model is quite small, trained on about 150 rows and 40 features. The hyperparameters I choose to optimize in this case were the n_estimators (number of trees in the forest), the criterion (gini impurity or entropy, measures quality of splits) and the max_leaf_nodes (maximum number of leaf nodes).

My initial attempt (as I have done always so far) was to do a brute force grid search on the hyperparameter space. I specified the data points I wanted for each hyperparameter and then construct a cartesian product of points in hyperparameter space. For each point, I measure the mean average precision (MAP) using 5-Fold cross validation against the training set and enumerate them, finally choosing the smallest possible model with the largest MAP.

The (partial) code to do this is provided below to illustrate the approach described above. Full code is provided at the end of the post.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def manual_explore(X, y, bounds):
    scores = {}
    params = list(itertools.product(*bounds))
    for param_tuple in params:
        params = list(param_tuple)
        score = get_cv_score(X, y, params)
        print_score(params, score)
        scores[param_tostring(params)] = score
    return scores

bounds_manual = [
    [10, 30, 100, 200],     # n_estimators
    ["gini", "entropy"],    # criteria
    [32, 50, 100, 150]      # max_leaf_nodes
]
scores_man = manual_explore(X, y, bounds_manual)

The plot below shows the MAP values as points in the 3D hyperparameter space. The colors from blue (cyan) to red (pink) represent low and high MAP scores respectively. Clearly, this approach explores the hyperparameter space quite systematically.


And here are the results in tabular form. The hyperparameter combinations that resulted in the highest MAP are highlighted.

n_estimatorscriterionmax_leaf_nodesMAP
10gini320.92794
10gini500.92794
10gini1000.92794
10gini1500.92794
10entropy320.92542
10entropy500.92542
10entropy1000.92542
10entropy1500.92542
30gini320.98209
30gini500.98209
30gini1000.98209
30gini1500.98209
30entropy320.94292
30entropy500.94292
30entropy1000.94292
30entropy1500.94292
100gini320.97140
100gini500.97140
100gini1000.97140
100gini1500.97140
100entropy320.96334
100entropy500.96334
100entropy1000.96334
100entropy1500.96334
200gini320.97140
200gini500.97140
200gini1000.97140
200gini1500.97140
200entropy320.95696
200entropy500.95696
200entropy1000.95696
200entropy1500.9569

Somewhere around this time (but after I proposed the model built with the highlighted hyperparameters above), I read about Bayesian Hyperparameter Search on the Natural Language Processing Blog. Digging around a bit more, I learned about two Python packages Spearmint and HyperOpt that allow you to automatically discover optimal hyperparameters. This FastML Blog Post describes a case study where the author uses Spearmint to discover the optimal learning rate for a Neural Network.

I figured it may be interesting to try and find the optimal hyperparameters for my Random Forest classifier using one of these packages. I tried to install Spearmint locally but wasn't able to run the tutorial example, so I looked at HyperOpt instead. The hyperopt-sklearn project provides a way to use HyperOpt functionality using Scikit-Learn idioms, but the documentation is a bit sparse. Not surprising since these are being actively developed. In any case (primarily thanks to this post by Marco Altini), by this time I had a few ideas of my own about how to build a DIY (Do It Yourself) version of this, so thats what I ended up doing. This post describes the implementation.

The basic idea is to select the boundaries of the hyperparameter space you want to optimize within, then start off with a random point in this space and compute the MAP of the model trained with these hyperparameters using 5-fold cross validation. Changing a single parameter at a time, you compute the acceptance rate as follows:


If the new MAP is higher than the previous one (i), we move to the new point (j), otherwise we move to j with a probability given by aij. In case we don't move, we "reset" the point by generating a random point that may change more than one parameter. We do this iteratively till convergence or for a specific number of iterations, whichever occurs sooner.

The (partial) code for the above algorithm is provided below. As before, some of the function calls are not shown here. The full code is provided at the end of the post.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def auto_explore(X, y, bounds, max_iters, tol, threshold=None):
    change_all_params = False
    prev_scores = {}
    num_iters = 0
    # get initial point
    params = get_random_point(bounds)
    prev_score = get_cv_score(X, y, params)
    prev_scores[param_tostring(params)] = prev_score
    print_score(params, prev_score)
    while (True):
        if num_iters > max_iters:
            break
        pivot = get_random_pos(bounds)
        if change_all_params:
            params = get_random_point(bounds)
        else:
            params = change_single_param(bounds, params, pivot)
        if prev_scores.has_key(param_tostring(params)):
            continue
        score = get_cv_score(X, y, params)
        prev_scores[param_tostring(params)] = score
        print_score(params, score)
        if prev_score <= score and score - prev_score < tol:
            # convergence
            if threshold is None:
                break
            else:
                if score > threshold:
                    break
        # hill-climbing
        move_prob = min([1, score / prev_score])
        num_iters += 1
        prev_score = score
        if move_prob == 1:
            # if new score better than old score, keep going
            change_all_params = False
            continue
        else:
            # only keep with single change with prob move_prob
            mr = random.random()
            if mr > move_prob:
                change_all_params = False
                continue
            else:
                change_all_params = True
                continue
    return prev_scores

Results for a couple of runs are shown below. As you can see, convergence to a local maxima in the hyperparameter space is super quick with this approach. However, both of these converge to a point that has a MAP of 0.97140 (which is lower than our best result we got from our brute-force grid search.






To get around this, I introduced a new parameter that sets an "impossible" threshold to force the algorithm to continue for the maximum number of iterations. Because the points found with this approach tend to find good local maxima, chances are that it will also find the global maxima among these.

Here are the MAP scores plotted in this hyperparameter space for this run. As you can see, the algorithm quickly moves to the higher MAP areas of the space and (compare the amount of pink on this plot with the one on the cartesian join style exploration I did earlier). It also finds some points with the "best" MAP that was found by our previous approach. Not surprisingly, they are situated around the same location in hyperparameter space.


And here are the same results in tabular form. I have highlighted all the hyperparameter combinations where the highest MAP is achieved.

n_estimatorscriterionmax_leaf_nodesMAP
10entropy320.92542
41entropy320.94000
41gini320.98209
112gini320.97140
184gini1400.97140
152gini1400.97140
21gini1400.94907
111gini950.97140
50gini950.96058
102gini890.97140
194gini890.97140
43gini890.98209
156gini890.97140
174gini1330.97140
123gini1330.97140
88gini1330.97140
116gini1330.97140
29gini1330.97584
136gini1330.97140
68gini680.96239
157gini1230.97140
51gini1230.96058
119gini990.97140
32gini990.98209
182gini990.97140
76gini730.97140
167gini730.97140
118gini730.97140
137gini730.97140
10gini730.92794
48gini550.96334
39gini550.98209
161gini550.97140
119gini1000.97140
126gini1000.97140
135gini1000.97140
105gini1000.97140
19gini1000.94606
141gini1130.97140
83gini1130.96515
83gini770.96515
13gini770.92271
15gini350.94374
68gini350.96239
66gini350.96239
149gini350.96552
144gini350.97140
180gini350.97140
114gini350.97140
123gini350.97140
148gini350.96552
59gini620.96239
45gini530.97140
106gini530.97140
91gini530.96515
118gini990.97140
184gini990.97140
199gini990.97140
28gini990.96959
47gini550.96515
19gini370.94606
85gini790.96515
69gini790.96239
116gini980.97140
176gini980.97140
57gini980.96864
199gini1490.97140
171gini1490.97140
107gini1490.96864
22gini390.95404
66gini670.96239
122gini670.97140
46gini670.97140
79gini670.97140
60gini670.96864
21gini390.94907
77gini730.96515
18gini730.95709
124gini1030.97140
90gini1030.96515
73gini710.96239
194gini1460.97140
23gini1460.96103
105gini910.97140
62gini910.96239
44gini530.96683
167gini530.97140
134gini530.97140
76gini530.97140
16gini530.95651
84gini780.97140
40gini780.98209
115gini780.97140
103gini890.97140
141gini890.97140
191gini890.97140
123gini890.97140
131gini890.97140
89gini890.96515
196gini1470.97140
152gini1470.97140
133gini1470.9714

Here is the full code for the experiment. The input data (not provided) is a TSV file containing 3 columns - the internal record ID (to tie model results back to the application), the label (0 or 1) and a comma-separated list of 40 features.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# -*- coding: utf-8 -*-
from __future__ import division
from mpl_toolkits.mplot3d import Axes3D
import itertools
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.cross_validation import KFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import average_precision_score

def get_cv_score(X, y, params):
    test_accs = []
    kfold = KFold(n=X.shape[0], n_folds=5, random_state=24)
    for train, test in kfold:
        Xtrain, Xtest, ytrain, ytest = X[train], X[test], y[train], y[test]
        clf = RandomForestClassifier(n_estimators=params[0], 
                                     criterion=params[1], 
                                     max_depth=4, 
                                     max_leaf_nodes=params[2], 
                                     n_jobs=-1, random_state=24)
        clf.fit(Xtrain, ytrain)
        ypred = clf.predict(Xtest)
        test_accs.append(average_precision_score(ytest, ypred))
    return sum(test_accs) / len(test_accs)

def print_score(params, score):
    sparams = []
    for param in params:
        if type(param) != "str":
            sparams.append(str(param))
    sparams.append("%.5f" % (score))
    print(",".join(sparams))
    
def manual_explore(X, y, bounds):
    scores = {}
    params = list(itertools.product(*bounds))
    for param_tuple in params:
        params = list(param_tuple)
        score = get_cv_score(X, y, params)
        print_score(params, score)
        scores[param_tostring(params)] = score
    return scores

def get_random_point(bounds):
    r = random.random()
    rps = []
    for i in range(len(bounds)):
        bound = bounds[i]
        if type(bound[0]) == str:
            str_bounds = np.linspace(0, 1, len(bound))[1:]
            for i in range(str_bounds.shape[0]):
                if r < str_bounds[i]:
                    rp = bound[i]
                    break
        else:
            rp = bound[0] + int(r * (bound[1] - bound[0]))
        rps.append(rp)
    return rps

def get_random_pos(bounds):
    rpos_bounds = np.linspace(0, 1, num=len(bounds))[1:]
    r = random.random()
    pos = None
    for i in range(rpos_bounds.shape[0]):
        if r < rpos_bounds[i]:
            pos = i
            break
    return pos
    
def change_single_param(bounds, curr, pos):
    rpoint = get_random_point(bounds)
    curr[pos] = rpoint[pos]
    return curr

def param_tostring(params):
    sparams = []
    for param in params:
        if type(param) != str:
            sparams.append(str(param))
        else:
            sparams.append(param)
    return ",".join(sparams)
    
def add_to_prev_params(prev_params, param):
    prev_params.add(param_tostring(param))

def already_seen(param, prev_params):
    return param_tostring(param) in prev_params

def auto_explore(X, y, bounds, max_iters, tol, threshold=None):
    change_all_params = False
    prev_scores = {}
    num_iters = 0
    # get initial point
    params = get_random_point(bounds)
    prev_score = get_cv_score(X, y, params)
    prev_scores[param_tostring(params)] = prev_score
    print_score(params, prev_score)
    while (True):
        if num_iters > max_iters:
            break
        pivot = get_random_pos(bounds)
        if change_all_params:
            params = get_random_point(bounds)
        else:
            params = change_single_param(bounds, params, pivot)
        if prev_scores.has_key(param_tostring(params)):
            continue
        score = get_cv_score(X, y, params)
        prev_scores[param_tostring(params)] = score
        print_score(params, score)
        if prev_score <= score and score - prev_score < tol:
            # convergence
            if threshold is None:
                break
            else:
                if score > threshold:
                    break
        # hill-climbing
        move_prob = min([1, score / prev_score])
        num_iters += 1
        prev_score = score
        if move_prob == 1:
            # if new score better than old score, keep going
            change_all_params = False
            continue
        else:
            # only keep with single change with prob move_prob
            mr = random.random()
            if mr > move_prob:
                change_all_params = False
                continue
            else:
                change_all_params = True
                continue
    return prev_scores

def plot_scatter(scores):
    xx = []
    yy = []
    zz = []
    colors = []
    for k in scores.keys():
        kcols = k.split(",")
        xx.append(int(kcols[0]))
        yy.append(1 if kcols[1] == "gini" else 0)
        zz.append(int(kcols[2]))
        colors.append(scores[k])
    fig = plt.figure()
    ax = fig.add_subplot(111, projection="3d")
    ax.scatter(xx, yy, zz, c=colors, marker='o', cmap="cool")
    ax.set_xlabel("num_estimators")
    ax.set_ylabel("criterion")
    ax.set_zlabel("max_leaf_nodes")
    plt.show()
    
################################ main ###############################

# read data from file
trainset = open("/path/to/training_data.tsv", 'rb')
feature_mat = []
label_vec = []
for line in trainset:
    _, label, features = line.strip().split("\t")
    feature_mat.append([float(x) for x in features.split(',')])
    label_vec.append(int(label))
trainset.close()

X = np.matrix(feature_mat)
y = np.array(label_vec)

# manual exploration
bounds_manual = [
    [10, 30, 100, 200],     # n_estimators
    ["gini", "entropy"],    # criteria
    [32, 50, 100, 150]      # max_leaf_nodes
]
scores_man = manual_explore(X, y, bounds_manual)
plot_scatter(scores_man)

# automatic exploration
bounds_auto = [
    [10, 200], # n_estimators
    ["gini", "entropy"],  #criteria
    [32, 150]  # max_leaf_nodes
]
tol = 1e-6
max_iters = 100
scores_auto = auto_explore(X, y, bounds_auto, max_iters, tol, 0.99)
plot_scatter(scores_auto)

Thats all I have for today, hope you enjoyed it. This approach to optimization seems to be quite effective compared to what I've been doing, and I hope to use this more going forward.

Sunday, September 27, 2015

Sentence Similarity using Word2Vec and Word Movers Distance


Sometime back, I read about the Word Mover's Distance (WMD) in the paper From Word Embeddings to Document Distances by Kusner, Sun, Kolkin and Weinberger. The WMD is a distance function that measures the distance between two texts as the cumulative sum of minimum distance each word in one text must move in vector space to the closest word in the other text. In the paper, the authors provide some examples where WMD is calculated against a Word2Vec vector space. Since Word2Vec word embeddings preserve aspects of the word's context, its a good way to capture semantic meaning (or difference in meaning) when calculating WMD.

The paper reminded me of a similar (in intent) algorithm that I had implemented earlier and written about in my post Computing Semantic Similarity for Short Sentences. There, we captured the semantic meaning using an external semantic network (Wordnet).

Since the problems were so similar, I figured that it might be interesting to compute the WMD for the sentence pairs in this paper and see how they match up with intuition. I already had lying around a dump of the GoogleNews vectors (pretrained vectors over about 100B words of Google News) from a previous project. The paper described results over a dataset of just 16 short sentence pairs, so I decided to do this interactively on Spark using a Databricks notebook. We use Databricks at work and its ideal for this kind of quick and dirty ad-hoc work.

First we load up our 16 sentence pairs. The input is 3 columns - sentence#1, sentence#2 and the original score, tab separated. Since we don't care about the original score, we discard it and convert the input to a pair.

Since we want to compare words across sentences in the same pair, it makes sense to have these words in the same worker when they are compared, so we add an index key to each sentence pair. The output of this cell is an RDD that looks like ((sentence1: String, sentence2: String), index: Long).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
import org.apache.spark.storage.StorageLevel

val sentencePairs = sc.textFile("sentence_pairs.txt")
    .map(line => {
        val Array(s1, s2, _) = line.split('\t')
        (s1, s2)
    })
    .zipWithIndex
    .persist(StorageLevel.MEMORY_AND_DISK)
sentencePairs.count()

WMD between two sentences (or between any two blobs of text) is computed as the sum of the distances between closest pairs of words in the texts. The words are pre-processed to remove stop words, so the next cell pulls in a list of English stopwords which I convert to a Set and broadcast to the Worker boxes.

1
2
val stopwords = sc.textFile("stopwords.txt").collect.toSet
val bStopwords = sc.broadcast(stopwords)

We now split up both sentences into words (removing punctuation and splitting on whitespace), removing stopwords from each, then flatMap-ing them to the format (index: Long, (word1: String, word2: String)). This gives us a list of 71 word pairs.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def getWordPairs(id: Long, s1: String, s2: String, stopwords: Set[String]): 
        List[(Long, (String, String))] = {
    val w1s = s1.toLowerCase
          .replaceAll("\\p{Punct}", "")
          .split(" ")
          .filter(w => !stopwords.contains(w))
    val w2s = s2.toLowerCase
          .replaceAll("\\p{Punct}", "")
          .split(" ")
          .filter(w => !stopwords.contains(w))
    val wpairs = for (w1 <- w1s; w2 <- w2s) yield (id, (w1, w2))
    wpairs.toList
}

val wordPairs = sentencePairs.flatMap(ssi => 
    getWordPairs(ssi._2, ssi._1._1, ssi._1._2, bStopwords.value))
wordPairs.count()

Next we ingest the Word2Vec vectors. I've used Gensim's Word2Vec module to convert the the original Word2Vec binary format to TSV. The format of this dataset is (word: String, comma-separated list of 300 vector elements).

1
2
3
4
5
val w2vs = sc.textFile("GoogleNews-vectors-negative300.tsv")
    .map(line => {
        val Array(word, vector) = line.split('\t')
        (word, vector)
    })

Next, we join the wordPairs against the w2vs RDD on the RHS and the LHS words to get the 300 dimensional word2vec vector for the RHS and LHS word respectively. We do a lot of moving things around so I have used case matching instead of the less intuitive underscore syntax to represent tuple elements and subelements. Note that we need to hang on to the left word because we want to find the word that is closest to each left word.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import breeze.linalg._

def dist(lvec: String, rvec: String): Double = {
    val lv = DenseVector(lvec.split(',').map(_.toDouble))
    val rv = DenseVector(rvec.split(',').map(_.toDouble))
    math.sqrt(sum((lv - rv) :* (lv - rv)))
}

val wordVectors = wordPairs.map({case (idx, (lword, rword)) => 
        (rword, (idx, lword))})
    .join(w2vs)    // (rword, ((idx, lword), rvec))
    .map({case (rword, ((idx, lword), rvec)) => (lword, (idx, rvec))})
    .join(w2vs)    // (lword, ((idx, rvec), lvec))
    .map({case (lword, ((idx, rvec), lvec)) => ((idx, lword), (lvec, rvec))})
    .map({case ((idx, lword), (lvec, rvec)) => 
        ((idx, lword), List(dist(lvec, rvec)))}) 
    .persist(StorageLevel.MEMORY_AND_DISK)

I used Euclidean Distance in Word2Vec space for distance between words. I also tried using Cosine Distance (1 - Cosine Similarity) with similar results. We then sum all the shortest distances across all LHS words to get the WMD for the sentence pair.

1
2
3
4
val bestWMDs = wordVectors.reduceByKey((a, b) => a ++ b)
    .mapValues(dists => dists.sortWith(_ < _).head)  // dist to closest word
    .map({case ((idx, lword), wmd) => (idx, wmd)})
    .reduceByKey((a, b) => a + b)                    // sum all wmds for sent

Finally, we join these WMD scores back into the original dataset using the pair index that we originally generated using zipWithIndex.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

case class SentencePair(s1: String, s2: String, wmd: Double)
val results = sentencePairs.map(_.swap)
    .join(bestWMDs)
    .map({case (id, ((s1, s2), wmd)) => SentencePair(s1, s2, wmd)})
val resultsDF = sqlContext.createDataFrame(results)
    .orderBy($"s1".asc, $"wmd".asc)
display(resultsDF)

The results are shown below. The sentences are sorted by the LHS sentence first, then by WMD (lowest WMD first so we can easily see the closest sentence pairs first and compare them to pairs that are not as close).

LHS SentenceRHS SentenceWMD
A glass of cider.A full cup of apple juice.2.2169259719396095
Canis familiaris are animals.Dogs are common pets.1.859694788966317
Dogs are animals.They are common pets.1.4537090848972198
I have a hammer.Take some nails.1.1578027104196844
I have a hammer.Take some apples.1.3028564676146912
I have a pen.Where is ink?1.020277185488236
I have a pen.Where do you live?1.3924941078355293
I like that bachelor.I like that unmarried man.1.176742725809037
It is a dog.That must be your dog.0
It is a dog.It is a pig.1.04864558369858
It is a dog.It is a log.1.3798001799052624
John is very nice.Is John very nice?0
Red alcoholic drink.Fresh orange juice.3.1161814560971166
Red alcoholic drink.A bottle of wine.3.386809492524872
Red alcoholic drink.Fresh apple juice.3.505168296314785
Red alcoholic drink.An English dictionary.4.106139922327307

As you can see, the scoring seems correct. For example, it finds that a "glass of cider" and a "cup of apple juice" are quite similar, even though there are no shared words (except for stopwords). Similarly "I have a hammer" is more similar to "Take some nails" than "Take some apples". The only intuitively incorrect result in this set is that "Red alcoholic drink" is more similar to "Fresh orange juice" than a "A bottle of wine". However, "A bottle of wine" is more similar to "Red Alcoholic drink" than "Fresh apple juice" and "An English dictionary" respectively. So overall, it seems to work on my limited dataset.

In my case, I already have two sentences and I just have to find the distance between them. In cases where you have to find the closest sentence, the complexity of the algorithm is O(p3 log p). One suggestion is to prune the number of possible RHS sentences by thresholding on the centroid distance (WCD) or relaxed WMD (see the paper for details) between the two sentences, and only running the full WMD on the pruned set of sentence pairs.

Sunday, August 30, 2015

Categorizing Medical Transcripts using DMOZ and Word2Vec


Sometime back, a colleague mentioned during a conversation that his PhD dissertation involved using DMOZ for clustering query terms. The technique seemed interesting in relation to a problem we were trying to solve at the time, but also got me thinking that perhaps this idea could be useful for categorizing documents as well. Since DMOZ provides a comprehensive hierarchy of web categories, and links to representative documents in each of these categories, the category (or categories) of an unseen document can be determined by computing the similarity of this document against the representative documents. Word2Vec vectors can be used to reduce both reference and test documents to the same latent space for comparison. To test this idea out, I used a subset of DMOZ categories under medical specialties to categorize a small collection of Medical Transcription documents I had crawled some time back. This post describes the effort.

DMOZ data is available for download as a pair of large RDF files - structure.rdf and content.rdf. The structure.rdf file contains the categories defined as path like strings. Categories are defined using the Topic tag and nested using narrow and symbolic tags. The content.rdf file also contains these path-like categories defined using the Topic tag and provides links to representative web pages for these categories using the link tag. Since we are only interested in Medical Specialties, we used the following code to parse the two RDFs in a streaming manner (SAX), and extract the links from them. Each link is then sent to Boilerpipe's Article Extractor, which strips out the HTML markup and removes irrelevant text from the page (using a combination of rules and machine learning).

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Source: src/main/scala/dmozcat/preprocessing/ReferencePageExtractor.scala
package dmozcat.preprocessing

import java.io.File
import java.io.FileWriter
import java.io.PrintWriter
import java.net.URL
import java.util.concurrent.FutureTask
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import javax.xml.parsers.SAXParserFactory

import org.xml.sax.Attributes
import org.xml.sax.helpers.DefaultHandler

import de.l3s.boilerpipe.extractors.ArticleExtractor

import scala.collection.mutable.ArrayBuffer

object ReferencePageExtractor extends App {
    val dataDir = "data"
    val structXml = new File(dataDir, "structure.rdf.u8")
    val contentXml = new File(dataDir, "content.rdf.u8")
    val outputCsv = new File(dataDir, "ref-pages.csv")
    val extractor = new ReferencePageExtractor(structXml, contentXml, outputCsv)
    extractor.extract()
}

class ReferencePageExtractor(structXml: File, contentXml: File, outputCsv: File) {
    
    def extract(): Unit = {
        val factory = SAXParserFactory.newInstance()
        // parse structure RDF to get list of topics
        val structParser = factory.newSAXParser()
        val structHandler = new DmozStructHandler()
        structParser.parse(structXml, structHandler)
        val topicSet = structHandler.topicSet()
        // parse content RDF to get list of URLs for each topic
        val contentParser = factory.newSAXParser()
        val contentHandler = new DmozContentHandler(topicSet)
        contentParser.parse(contentXml, contentHandler)
        val contentUrls = contentHandler.contentUrls()
        // download pages and write to file
        val writer = new PrintWriter(new FileWriter(outputCsv), true)
        contentUrls.foreach(topicUrl => {
            val topic = topicUrl._1
            val url = topicUrl._2
            val downloadTask = new FutureTask(new Callable[String]() {
                def call(): String = {
                    try {
                        val text = ArticleExtractor.INSTANCE
                            .getText(new URL(url))
                            .replaceAll("\n", " ")
                        Console.println("Downloading %s for topic: %s"
                            .format(url, topic))
                        text
                    } catch {
                        case e: Exception => "__ERROR__"
                    }
                }
            })
            new Thread(downloadTask).start()
            try {
                val text = downloadTask.get(60, TimeUnit.SECONDS)
                if (!text.equals("__ERROR__")) {
                    writer.println("%s\t%s\t%s".format(topic, url, text))
                } else {
                    Console.println("Download Error, skipping")
                }
            } catch {
                case e: TimeoutException => Console.println("Timed out, skipping")
            }
        })
        writer.flush()
        writer.close()
    }
}

class DmozStructHandler extends DefaultHandler {
    
    val contentTopics = Set("narrow", "symbolic")
    
    var isRelevant = false
    val topics = ArrayBuffer[String]()
    
    def topicSet(): Set[String] = topics.toSet
    
    override def startElement(uri: String, localName: String, 
            qName: String, attrs: Attributes): Unit = {
        if (!isRelevant && qName.equals("Topic")) {
            val numAttrs = attrs.getLength()
            val topicName = (0 until numAttrs)
                .filter(i => attrs.getQName(i).equals("r:id"))
                .map(i => attrs.getValue(i))
                .head
            if (topicName.equals("Top/Health/Medicine/Medical_Specialties"))
                isRelevant = true
        }
        if (isRelevant && contentTopics.contains(qName)) {
            val numAttrs = attrs.getLength()
            val contentTopicName = (0 until numAttrs)
                .filter(i => attrs.getQName(i).equals("r:resource"))
                .map(i => attrs.getValue(i))
                .map(v => if (v.indexOf(':') > -1)
                    v.substring(v.indexOf(':') + 1) else v)
                .head
            topics += contentTopicName
        }
    }
    
    override def endElement(uri: String, localName: String, 
            qName: String): Unit = {
        if (isRelevant && qName.equals("Topic")) isRelevant = false
    }
}

class DmozContentHandler(topics: Set[String]) extends DefaultHandler {
    
    var isRelevant = false
    var currentTopicName: String = null
    val contents = ArrayBuffer[(String, String)]()
    
    def contentUrls(): List[(String, String)] = contents.toList
    
    override def startElement(uri: String, localName: String, 
            qName: String, attrs: Attributes): Unit = {
        if (!isRelevant && qName.equals("Topic")) {
            val numAttrs = attrs.getLength()
            val topicName = (0 until numAttrs)
                .filter(i => attrs.getQName(i).equals("r:id"))
                .map(i => attrs.getValue(i))
                .head
            if (topics.contains(topicName)) {
                isRelevant = true
                currentTopicName = topicName
            }
        }
        if (isRelevant && qName.equals("link")) {
            val numAttrs = attrs.getLength()
            val link = (0 until numAttrs)
                .filter(i => attrs.getQName(i).equals("r:resource"))
                .map(i => attrs.getValue(i))
                .head
            contents += ((currentTopicName, link))
        }
    }
    
    override def endElement(uri: String, localName: String, 
            qName: String): Unit = {
        if (isRelevant && qName.equals("Topic")) {
            isRelevant = false
            currentTopicName = null
        }
    }
}

Output of this code is a tab separated file containing the category name, file URL, and the text. There are 464 records in all and 44 unique categories. The data looks something like this.

1
2
3
Top/Health/Medicine/Medical_Specialties/Aerospace_Medicine  http://www.civilavmed.com/  Home How are you as an AME adapting to the new ...
Top/Health/Medicine/Medical_Specialties/Aerospace_Medicine  http://www.asma.org/        Sunday, September 20, 2015 8:00 AM ICASM 2015 The Congress ...
...

The next step is to vectorize the reference text (the third column in the file shown above). We use OpenNLP's models to segment the text into sentences and extract noun phrases, using the excellent Scala interface provided by the NLP Tools Project from the University of Washington.

One of the artifacts published by the Word2Vec project is the set of vectors for around 3 million words and phrases trained on the Google News dataset. This information is released as a binary file, and we use Gensim's Word2Vec module to convert to TSV format that our code can use. Using the noun phrase retrieved using OpenNLP, we construct unigrams, bigrams and trigrams and look them up against this word vector dictionary. The vectors for the ngrams so found are added and normalized to build a single vector for each category.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Source: src/main/scala/dmozcat/vectorize/TextVectorizer.scala
package dmozcat.vectorize

import scala.Array.canBuildFrom

import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

import breeze.linalg.DenseVector
import breeze.linalg.InjectNumericOps
import breeze.linalg.norm

object TextVectorizer {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
        
        // arguments
        val awsAccessKey = args(0)
        val awsSecretKey = args(1)
        val inputFile = args(2)
        val word2vecFile = args(3)
        val outputDir = args(4)
        
        val conf = new SparkConf()
        conf.setAppName("TextVectorizer")
        
        val sc = new SparkContext(conf)
        
        sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
        sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)
        
        val input = sc.textFile(inputFile)
            .map(line => parseLine(line))
            .mapPartitions(p => extractNGrams(p)) // (key, ngram)
            .map(kv => (kv._2, kv._1))            // (ngram, key)
            
        val wordVectors = sc.textFile(word2vecFile)
            .map(line => {
                val Array(word, vecstr) = line.split("\t")
                val vector = new DenseVector(vecstr.split(",").map(_.toDouble))
                (word.toLowerCase, vector)        // (ngram, vector)
            })
        
        // join input to wordVectors by word
        val inputVectors = input.join(wordVectors)  // (ngram, (key, vector))
            .map(nkv => (nkv._2._1, nkv._2._2))     // (key, vector)
            .aggregateByKey((0, DenseVector.zeros[Double](300)))(
                (acc, value) => (acc._1 + 1, acc._2 + value),
                (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
            .mapValues(countVec => 
                (1.0D / countVec._1) * countVec._2) // (key, mean(vector))
            
        // save document (id, vector) pair as flat file
        inputVectors.map(kvec => {
            val key = kvec._1
            val value = (kvec._2 / norm(kvec._2, 2))
                .toArray
                .map("%.5f".format(_))
                .mkString(",")
            "%s\t%s".format(key, value)
        }).saveAsTextFile(outputDir)
    }
        
    def parseLine(line: String): (String, String) = {
        val cols = line.split("\t")
        val key = cols.head
        val text = cols.last
        (key, text)
    }
    
    def extractNGrams(p: Iterator[(String, String)]): 
            Iterator[(String, String)] = {
        val t2n = new NGramExtractor()
        p.flatMap(keyText => t2n.ngrams(keyText))
    }
}

The actual vectorization logic is encapsulated inside the NGramExtractor, whose code is shown below. If you have used the OpenNLP API directly, you will appreciate the convenience of using the NLPTools API. Plus, since the models are captured as dependencies, you no longer have to explicitly deal with the OpenNLP model files. The only downside of the NLPTools API is the number of dependencies you must specify in your build file - this is because NLPTools unify access to a bunch of underlying tools, and each of them have different licenses, so having different JAR files for each is how NLPTools deals with this.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Source: src/main/scala/dmozcat/vectorize/NGramExtractor.scala
package dmozcat.vectorize

import edu.knowitall.tool.sentence.OpenNlpSentencer
import edu.knowitall.tool.postag.OpenNlpPostagger
import edu.knowitall.tool.tokenize.OpenNlpTokenizer
import edu.knowitall.tool.chunk.OpenNlpChunker

import scala.collection.mutable.ArrayBuffer

class NGramExtractor {

    val sentencer = new OpenNlpSentencer
    val postagger = new OpenNlpPostagger
    val tokenizer = new OpenNlpTokenizer
    val chunker = new OpenNlpChunker
    
    def ngrams(keyText: (String, String)): List[(String, String)] = {
        val key = keyText._1
        val text = keyText._2
        // segment text into sentences
        val sentences = sentencer.segment(text)
        // extract noun phrases from sentences
        val nounPhrases = sentences.flatMap(segment => {
            val    sentence = segment.text
            val chunks = chunker.chunk(sentence)
            chunks.filter(chunk => chunk.chunk.endsWith("-NP"))
                .map(chunk => (chunk.string, chunk.chunk))
                .foldLeft(List.empty[String])((acc, x) => x match {
                    case (s, "B-NP") => s :: acc
                    case (s, "I-NP") => acc.head + " " + s :: acc.tail
                }).reverse
        })
        // extract ngrams (n=1,2,3) from noun phrases
        val ngrams = nounPhrases.flatMap(nounPhrase => {
            val words = nounPhrase.toLowerCase.split(" ")
            words.size match {
                case 0 => List()
                case 1 => words
                case 2 => words ++ words.sliding(2).map(_.mkString("_"))
                case _ => words ++ 
                    words.sliding(2).map(_.mkString("_")) ++
                    words.sliding(3).map(_.mkString("_"))
            }
        })
        ngrams.map(ngram => (key, ngram)).toList
    }
}

The text from the documents to be categorized are similarly vectorized by extracting the noun phrases and looking up vectors for its unigrams, bigrams and trigrams against the Word2Vec dictionary. In order to use the same code to do this, we preprocess our documents from a set of files in a directory to a single file containing tab-separated filename and corresponding text.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/scala/dmozcat/preprocessing/TestDataReformatter.scala
package dmozcat.preprocessing

import java.io.File
import scala.io.Source
import java.io.PrintWriter
import java.io.FileWriter

object TestDataReformatter extends App {
    
    val inputDir = new File("/Users/palsujit/Projects/med_data/mtcrawler/texts")
    val outputFile = new PrintWriter(new FileWriter(new File("/tmp/testdata.csv")), true)
    inputDir.listFiles().foreach(inputFile => {
        val filename = inputFile.getName()
        val text = Source.fromFile(inputFile)
            .getLines
            .map(line => if (line.endsWith(".")) line else line + ".")
            .mkString(" ")
            .replaceAll("\\s+", " ")
        outputFile.println("%s\t%s".format(filename, text))
    })
    outputFile.flush()
    outputFile.close()
}

We vectorize the reference and test datasets in two separate Amazon EMR jobs. Amazon officially announced support for Spark on EMR couple of months ago (at the Spark Summit in San Francisco in June this year), and I've been meaning to try it out. Their instructions for running Spark on EMR is very detailed and worked perfectly for me, the only thing you need to know is to switch to the "advanced option" when defining your cluster.

Essentially Spark on EMR works in client mode. The first step invokes an Amazon custom job to execute a Hadoop dfs command to copy your JAR from S3 to the driver box. The next step actually runs your JAR against the Spark cluster. The JAR file needs to have (in this case) the OpenNLP models baked in, which can be done by invoking "sbt assembly".

The output of the job run against the file of reference text is a file of (category name, category vector) pairs, and a file of (filename, vector) pairs when run against the test dataset.

Next, we compute the cosine similarity of each document vector (in word2vec space) against each of the category vectors (also in word2vec space). Since there are only 44 category vectors, we convert this to a 44x300 matrix (the word2vec vectors provided are 300 dimensional) and broadcast it to the workers. Figuring out the best category is simply a matter of multiplying this matrix by the document vector. Both the matrix entries and the vector are normalized by their L2 norms, so the result is a vector of cosine similarities. We then find the top N (3) similarities, and their associated category names.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Source: src/main/scala/dmozcat/vectorize/KNearestCategories.scala
package dmozcat.vectorize

import scala.Array.canBuildFrom
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import breeze.linalg.DenseVector
import breeze.linalg.norm
import breeze.linalg.DenseMatrix

object KNearestCategories {
    
    def main(args: Array[String]): Unit = {
        
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
        
        // arguments
        val awsAccessKey = args(0)
        val awsSecretKey = args(1)
        val refVectorsDir = args(2)
        val testVectorsDir = args(3)
        val bestcatsFile = args(4)
        
        val conf = new SparkConf()
        conf.setAppName("KNearestCategories")
        
        val sc = new SparkContext(conf)
        
        sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
        sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)
    
        // read reference vectors and broadcast them to workers for
        // replicated join
        val refVectors = sc.textFile(refVectorsDir)
            .map(line => {
                val Array(catStr, vecStr) = line.split("\t")
                val cat = catStr.split("/").last
                val vec = new DenseVector(vecStr.split(",").map(_.toDouble))
                val l2 = norm(vec, 2.0)
                (cat, vec / l2)
            }).collect
        val categories = refVectors.map(_._1)
        // we want to take the Array[DenseVector[Double]] and convert
        // it to DenseMatrix[Double] so we can do matrix-vector multiplication
        // for computing similarities later
        val nrows = categories.size
        val ncols = refVectors.map(_._2).head.length
        val catVectors = refVectors.map(_._2)
            .reduce((a, b) => DenseVector.vertcat(a, b))
            .toArray
        val catMatrix = new DenseMatrix[Double](ncols, nrows, catVectors).t
        // broadcast it
        val bCategories = sc.broadcast(categories)
        val bCatMatrix = sc.broadcast(catMatrix) 
        
        // read test vectors representing each test document
        val testVectors = sc.textFile(testVectorsDir)
            .map(line => {
                val Array(filename, vecStr) = line.split("\t")
                val vec = DenseVector(vecStr.split(",").map(_.toDouble))
                val l2 = norm(vec, 2.0)
                (filename, vec / l2)
            })
            .map(fileVec => 
                (fileVec._1, topCategories(fileVec._2, 3,
                    bCatMatrix.value, bCategories.value)))
            
        testVectors.map(fileCats => "%s\t%s".format(
                fileCats._1, fileCats._2.mkString(", ")))
            .coalesce(1)
            .saveAsTextFile(bestcatsFile)
    }

    def topCategories(vec: DenseVector[Double], n: Int,
            catMatrix: DenseMatrix[Double],
            categories: Array[String]): List[String] = {
        val cosims = catMatrix * vec
        cosims.toArray.zipWithIndex
            .sortWith((a, b) => a._1 > b._1)
            .take(n)                           // argmax(n)
            .map(simIdx => (categories(simIdx._2), simIdx._1)) // (cat,sim)
            .map(catSim => "%s (%.3f)".format(catSim._1, catSim._2))
            .toList
    }
}

This is also a Spark job executed on EMR. One thing I learned was that you cannot use broadcast variables with Scala objects that extend App because of scoping issues, as discussed in this StackOverflow page and this Spark JIRA. The final output looks like this:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
2022.txt  Pain_Management (0.875), Podiatry (0.831), Sports_Medicine (0.826)
1928.txt  Pain_Management (0.858), Behavioral_Medicine (0.843), Podiatry (0.840)
1296.txt  Surgery (0.864), Podiatry (0.849), Radiotherapy (0.832)
0996.txt  Cardiology (0.864), Pain_Management (0.847), Radiotherapy (0.846)
2000.txt  Pain_Management (0.751), Cardiology (0.736), Radiotherapy (0.735)
0853.txt  Radiotherapy (0.773), Cardiology (0.767), Podiatry (0.734)
2228.txt  Pain_Management (0.918), Podiatry (0.904), Surgery (0.889)
0361.txt  Surgery (0.859), Podiatry (0.848), Pain_Management (0.843)
0916.txt  Cardiology (0.823), Radiotherapy (0.820), Pain_Management (0.801)
3078.txt  Palliative_Care (0.903), Surgery (0.887), Critical_Care (0.885)

Thats all I have for today. It was a lot of fun playing with Word2Vec and the KnowItAll APIs for OpenNLP, as well as using Spark on EMR. Nowadays people consider DMOZ slightly outdated compared to Wikipedia, but it provides a nice hierarchical topic taxonomy that can be used effectively for this kind of work. I hope you enjoyed reading it, hopefully it gave you some ideas to categorize your own data. All the code for this post is available on GitHub here.