Saturday, December 19, 2020

First steps with Pytorch Lightning

Some time back, Quora routed a "Keras vs. Pytorch" question to me, which I decided to ignore because it seemed too much like flamebait to me. Couple of weeks back, after discussions with colleagues and (professional) acquaintances who had tried out libraries like Catalyst, Ignite, and Lightning, I decided to get on the Pytorch boilerplate elimination train as well, and tried out Pytorch Lightning. As I did so, my thoughts inevitably went back to the Quora question, and I came to the conclusion that, in their current form, the two libraries and their respective ecosystems are more similar than they are different, and that there is no technological reason to choose one over the other. Allow me to explain.

Neural networks learn using Gradient Descent. The central idea behind Gradient Descent can be neatly encapsulated in the equation below (extracted from the same linked Gradient Descent article), and is referred to as the "training loop". Of course, there are other aspects of neural networks, such as model and data definition, but it is the training loop where the differences in the earlier versions of the two libraries and their subsequent coming together are most apparent. So I will mostly talk about the training loop here.

Keras was initially conceived of as a high level API over the low level graph based APIs from Theano and Tensorflow. Graph APIs allow the user to first define the computation graph and then execute it. Once the graph is defined, the library will attempt to build the most efficient representation for the graph before execution. This makes the execution more efficient, but adds a lot of boilerplate to the code, and makes it harder to debug if something goes wrong. The biggest success of Keras in my opinion is its ability to hide the graph API almost completely behind an elegant API. In particular, its "training loop" looks like this:

1
2
model.compile(optimizer=optimizer, loss=loss_fn, metrics=[train_acc])
model.fit(Xtrain, ytrain, epochs=epochs, batch_size=batch_size)

Of course, the fit method has many other parameters as well, but at its most complex, it is a single line call. And, this is probably all that is needed for most simple cases. However, as networks get slightly more complex, with maybe multiple models or loss functions, or custom update rules, the only option for Keras used to be to drop down to the underlying Tensorflow or Theano code. In these situations, Pytorch appears really attractive, with the power, simplicity, and readability of its training loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
dataloader = DataLoader(Xtrain, batch_size=batch_size)
for epoch in epochs:
    for batch in dataloader:
        X, y = batch
        logits = model(X)
        loss = loss_fn(logits, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        # aggregate metrics
        train_acc(logits, loss)

        # evaluate validation loss, etc.

However, with the release of Tensorflow 2.x, which included Keras as its default API through the tf.keras package, it is now possible to do something identical with Keras and Tensorflow as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
dataset = Dataset.from_tensor_slices(Xtrain).batch(batch_size)
for epoch in epochs:
    for batch in dataset:
        X, y = batch
        with tf.GradientTape as tape:
            logits = model(X)
            loss = loss_fn(y_pred=logits, y_true=y)
        grads = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # aggregate metrics
        train_acc(logits, y)

In both cases, developers accept having to deal with some amount of boilerplate in return for additional power and flexibility. The approach taken by each of the three Pytorch add-on libraries I listed earlier, including Pytorch Lightning, is to create a Trainer object. The trainer models the training loop as an event loop with hooks into which specific functionality can be injected as callbacks. Functionality in these callbacks would be executed at specific points in the training loop. So a partial LightningModule subclass for our use case would look something like this, see the Pytorch Lightning Documentation or my code examples below for more details.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class MyLightningModel(pl.LightningModule):
    def __init__(self, args):
        # same as Pytorch nn.Module subclass __init__()

    def forward(self, x):
        # same as Pytorch nn.Module subclass forward()

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = loss_fn(logits, y)
        acc = self.train_acc(logits, y)
        return loss

    def configure_optimizers(self):
        return self.optimizer

model = MyLightningModel()
trainer = pl.Trainer(gpus=1)
trainer.fit(model, dataloader)

If you think about it, this event loop strategy used by Lightning's trainer.fit() is pretty much how Keras manages to convert its training loop to a single line model.fit() call as well, its many parameters acting as the callbacks that control the training behavior. Pytorch Lightning is just a bit more explicit (and okay, a bit more verbose) about it. In effect, both libraries have solutions that address the other's pain points, so the only reason you would choose one or the other is personal or corporate preference.

In addition to callbacks for each of training, validation, and test steps, there are additional callbacks for each of these steps that will be called at the end of each step and epoch, for example: training_epoch_end() and training_step_end(). Another nice side effect of adopting something like Pytorch Lightning is that you get some of the default functionality of the event loop for free. For example, logging is done to Tensorboard by default, and progress bars are controlled using TQDM. Finally, (and that is the raison d'etre for Pytorch Lightning from the point of view of its developers) it helps you organize your Pytorch code.

To get familiar with Pytorch Lightning, I took three of my old notebooks, each dealing with training one major type of Neural Network architecture (from the old days) -- a fully connected, convolutional, and recurrent network, and converted it to use Pytorch Lightning. You may find it useful to look at, in addition to Pytorch Lightning's extensive documentation, including links to them below.

Monday, November 30, 2020

Word Sense Disambiguation using BERT as a Language Model

The BERT (Bidirectional Encoder Representation from Transformers) model was proposed in the paper BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding (Devlin, et al, 2019). BERT is the encoder part of an encoder-decoder architecture called Transformers, that was proposed in Attention is all you need (Vaswani, et al., 2017). The BERT model is pre-trained on two tasks against a large corpus of text in a self-supervised manner -- first, to predict masked words in a sentence, and second, to predict a sentence given the previous one, and are called Masked Language Modeling and Next Sentence Prediction tasks respectively. These pre-trained models can be further fine-tuned for tasks as diverse as classification, sequence prediction, and question answering.

Since the release of BERT, the research community has done a lot of work around Transformers and BERT-like architectures, so much so, that HuggingFace has its enormously popular transformers library dedicated to helping people work efficiently and easily with popular Transformer architectures. Among other things, the HuggingFace transformers library provides a unified interface to working with different kinds of Transformer architectures (with slightly different details), as well as provide weights for many pre-trained Transformer architectures.

Most of my work with transformers so far has been around fine-tuning them for Question Answering and Sequence Prediction. I recently came across a blog post Examining BERT's raw embeddings by Ajit Rajasekharan, where he describes how one can use a plain BERT model (pre-trained only, no fine-tuning required) and later a BERT Masked Language Model (MLM), as a Language Model, to help with Word Sense Disambiguation (WSD).

The idea is rooted in the model's ability to produce contextual embeddings for a words in a sentence. A pre-trained model has learned enough about the language it is trained on, to produce different embeddings for a homonym based on different sentence contexts it appears in. For example, a pre-trained model would produce a different vector representation for the word "bank" if it is used in the context of a bank robbery versus a river bank. This is different from how the older word embeddings such as word2vec work, in that case a word has a single embedding, regardless of the sentence context in which it appears.

An important point here is that there is no fine-tuning, we will leverage the knowledge inherent in the pre-trained models for our WSD experiments, and use these models in inference mode.

In this post, I will summarize these ideas from Ajit Rajasekharan's blog post, and provide Jupyter notebooks with implementations of these ideas using the HuggingFace transformers library.

WSD using raw BERT embeddings

Our first experiment uses a pre-trained BERT model initialized with the weights of a bert-base-cased model. We extract a matrix of "base" embeddings for each word in the model's vocabulary. We then pass in sentences containing our ambiguous word into the pre-trained BERT model, and capture the input embedding and output embedding for our ambiguous word. Our first sentence uses the word "bank" in the context of banking, and our second sentence uses it in the context of a river bank.

We then compute the cosine similarity between the embedding (input and output) for our ambiguous word against all the words in the vocabulary, and plot the histogram of cosine similarities. We notice that in both cases, the histogram shows a long tail, but the histogram for the output embedding seems to have a shorter tail, perhaps because there is less uncertainty once the context is known.

We then identify the words in the vocabulary whose embeddings are most similar (cosine similarity) to the embedding for our ambiguous word. As expected, the similar words for both input embeddings relate to banking (presumably because this may be the dominant usage of the word in the language). For the output embeddings, also as expected, similar words for our ambiguous word relate to banking in the first sentence, and rivers in the second.

The notebook WSD Using BERT Raw Embeddings contains the implementation described above.

WSD using BERT MLM

In our second experiment, we mask out the word "bank" in our two sentences and replace it with the [MASK] token. We then pass these sentences through a BERT Masked Language Model (MLM) initialized with weights from a bert-base-cased model. The output of the MLM is a 3-dimensional tensor of logits, where the first dimension is the number of sentences in the batch (1), the second dimension is the number of tokens in the input sentence, and the third domension is the number of words in the vocabulary. Effectively, the output provides log probabilities for predictions across the entire vocabulary for each token position in the input.

As before, we identify the logits corresponding to our masked position in the input (and output) sequence, then compute the softmax of the logits to convert them to probabilities. We then extract the top k (k=20) terms with the highest probabilities.

Again, as expected, predictions for the masked word are predominantly around banking for the first sentence, and predominantly around rivers for the second sentence.

The notebook WSD Using BERT Masked Language Model contains the implementation described above.

So thats all I had for today. Even though I understood the idea in Ajit Rajasekharan's blog post at a high level, and had even attempted something similar for WSD using non-contextual word embeddings (using the average of word embeddings across a span of text around the ambiguous word), it was interesting to actually go into the transformer model and figure out how to make things work. I hope you found it interesting as well.

Saturday, November 14, 2020

ODSC trip report and Keras Tutorial

I attended ODSC (Open Data Science Conference) West 2020 end of last month. I also presented Keras from Soup to Nuts -- an example driven tutorial there, a 3-hour tutorial on Keras. Like other conferences this year, the event was all-virtual. Having attended one other all-virtual conference this year (Knowledge Discovery and Data Mining (KDD) 2020 and being part of organizing another (an in-house conference), I can appreciate how much work it took to pull it off. As with the other conferences, I continue to be impressed at how effortless it all appears to be from the point of view of both speaker and attendee, so kudos to the ODSC organizers and volunteers for a job well done!

In this post, I want to cover my general impressions about the conference for readers of this blog. Content seems similar to PyData, except that not all talks here are based on Python (or Julia or R) related. As with PyData, the content is mostly targeted at data scientists in industry, with a few talks that are more academic, based on the presenter's own research. I think there is also more coverage on the career related aspects of Data Science than PyData. I also thought that there was more content here than in typical PyData conferences -- the conference was 4 days long (Monday to Friday) and multi-track, with workshops and presentations. The variety of content feels a bit like KDD but with less academic rigor. Overall, the content is high-quality, and if you enjoy attending PyData conferences, you will find more than enough talks and workshops here to hold your interest through the duration of the conference.

Pricing is also a bit steep compared to KDD and PyData, although there seem to be deep discounts available if you qualify. You have to contact the organizers for details about the discounts. Fortunately I didn't have to worry about that since I was presenting and my ticket was complimentary.

Like KDD and unlike PyData, OSDC also does not share talk recordings with the public after the conference. Speakers sometimes do share their slides and github repositories, so hopefully you will find these resources for the talks I list below. Because my internal conference (the one I was part of the organizing team for) was scheduled the very next week, I could not spend as much time at ODSC as I would have liked, so there were many talks that I would have liked to attend but I didn't. Here is the full schedule (until the link is repurposed for the 2021 conference).

As I mentioned earlier already, I also presented a 3 hour tutorial on Keras, so I wanted to cover that in slightly greater detail for readers here as well. As implied by the name, and the talk abstract, the tutorial tries to teach participants enough Keras to become advanced Keras programmers, and assumes only some Python programming experience as a pre-requisite. Clearly 3 hours is not enough time, so the notebooks are deliberately short on theory and heavy on examples. I organized the tutorial into 3 45-minute sessions, with exercises at the end of the first two, but we ended up just running through the exercise solutions instead because of time constraints.

The tutorial materials are just a collection of Colab notebooks that are available at my sujitpal/keras-tutorial-odsc2020 github repository. The project README provides additional information about what each notebook contains. Each notebook is numbered with the session and sequence within each session. There are two notebooks called exercise 1 and 2, and corresponding solution notebooks titled exercise_1_solved and exercise_2_solved.

Keras started life as an easy to use high level API to Theano and Tensorflow, but has since been subsumed into Tensorflow 2.x as its default API. I was among those who learned Keras in its first incarnation, when certain things were just impossible to do in Keras, and the only option was to drop down to Tensorflow 1.x's two-step model (create compute graph and then run it with data). In many cases, Pytorch provided simpler ways to do the same thing, so for complex models I found myself increasingly gravitating towards Pytorch. I did briefly look at Keras (now tf.keras) and Tensorflow 2.0-alpha while co-authoring the Deep Learning with Tensorflow 2 and Keras book, but the software was new and there was not a whole lot information available at the time.

My point of mentioning all this is to acknowledge that I ended up learning a bit of advanced Keras myself as well when building the last few notebooks. Depending on where you are with Keras, you might find them interesting as well. Some of the interesting examples covered (according to me) are Sequence to Sequence models with and without attention, using transformers from the Huggingface Transformers library in your Keras models, using Cyclic Learning Rates and LR Finder, and distributed training across multiple GPUs and TPU. I am actually quite pleasantly surprised at how much more you can do with tf.keras with respect to the underlying Tensorflow framework, and I think you will be too (if you aren't already).

Saturday, October 31, 2020

Entities from CORD-19 using Dask, SciSpaCy, and Saturn Cloud

Its been a while since I last posted here, but I recently posted on our Elsevier Labs blog, and I wanted to point folks here to that. The post, titled How Elsevier Accelerated COVID-19 research using Dask and Saturn Cloud, describes some work I did to extract biomedical entities from the CORD-19 dataset using Dask and trained Named Entity Recognition (NER) and Named Entity Recognition and Linking (NERL) models from SciSpaCy, on the Saturn Cloud platform.

At a high level, the pipeline takes documents from the CORD-19 dataset as input, decomposes them into sentences, and passes each sentence through one of nine trained SciSpaCy models (4 NER and 5 NERL) to extract spans of text representing different kinds of biomedical entities, such as CHEMICAL, DISEASE, GENE, PROTEIN, etc., as well as entities listed in various well-known biomedical ontologies (such as UMLS, MeSH, etc). The output is provided in tabular format as Parquet files consumable from many platforms, including Dask and Spark.

The pipeline described in the post was developed and executed on the Saturn Cloud platform. Saturn Cloud is a Platform as a Service (PaaS) that provides a Jupyter Notebooks (or Jupyter Labs) development environment on top of Amazon Web Services (AWS). It also provides a custom Dask scheduler that allows you to scale out to a cluster of workers. It also provides RAPIDS on GPU boxes for vertical scaling (scaling up), but I didn't use RAPIDS for this work.

Before I started working with Saturn Cloud, I was trying to develop the same pipeline (also using Dask) on a single AWS EC2 box (a t2.2xlarge with 8 vCPUs and 32 GB RAM. However, after the first few steps, I rapidly began to hit the resource constraints of a single machine, leading me to some interesting workarounds I descibe here. Once I moved to Saturn Cloud, these problems largely went away because I could now scale out the processing across a cluster of machines. In addition, the code got simpler because I no longer needed to work around resource constraints imposed by my single machine environment. My Saturn Cloud notebooks are available at my Github repository sujitpal/saturn-scipacy under an Apache 2.0 license. The README.md provides additional details about how the notebooks are organized, and the format of the output files.

We built two pipelines, the first is what we call a "full" pipeline. Input to this is a dated CORD-19 dataset and it extracts different entities using the nine models and outputs the entities as structured files. The second is an incremental format, which takes entities from a previous version of the COVID-19 dataset and the current dataset, figures out document additions and deletions between the two, and generates entities only for the added documents and deletes entities corresponding to the deleted documents. The incremental pipeline completes much faster than the full pipeline, typically under an hour compared to about 15 hours.

More interesting than the details of the processing, however, is the fact that we have made the output freely available at this requester-pays bucket on AWS. You will need an AWS account to access the data. The 2020-08-28 folder represents entities extracted by the full pipeline from the September 28 2020 version of CORD-19, and the 2020-09-28 folder represents entities extracted by the incremental pipeline from the October 28 2020 version. Each dataset is about 30-35 GB in size.

Because these are relatively large datasets, it is generally advisable to bring the code to the data rather than the other way round. So you probably want to keep the data in the cloud, maybe even within AWS (in which case you won't need to pay any network charges).

We believe the entity data would be useful for NLP based biomedical work (in-silico biology and pharma). Since the input to the pipeline, as well as the models, were both in the public domain, we thought it was only fitting that the output of the pipeline also be in the public domain. We hope it helps to advance the state of scientific knowledge around COVID-19 and helps in humanity's fight against the pandemic. If you happen to use the data in an academic setting, we would appreciate you citing it as Pal, Sujit (2020), “CORD-19 SciSpaCy Entity Dataset”, Mendeley Data, V2, doi: 10.17632/gk9njn3pth.2.


Saturday, August 08, 2020

Disambiguating SciSpacy + UMLS entities using the Viterbi algorithm

The SciSpacy project from AllenAI provides a language model trained on biomedical text, which can be used for Named Entity Recognition (NER) of biomedical entities using the standard SpaCy API. Unlike the entities found using SpaCy's language models (at least the English one), where entities have types such as PER, GEO, ORG, etc., SciSpacy entities have the single type ENTITY. In order to further classify them, SciSpacy provides Entity Linking (NEL) functionality through its integration with various ontology providers, such as the Unified Medical Language System (UMLS), Medical Subject Headings (MeSH), RxNorm, Gene Ontology (GO), and Human Phenotype Ontology (HPO)

The NER and NEL processes are decoupled. The NER process finds candidate entity spans, and these spans are matched against the respective ontologies, which may result in the span matching zero or more ontology entries. All candidate span is then matched to all the matched entities. 

I tried annotating the COVID-19 Open Research Dataset (CORD-19) against UMLS using the SciSpacy integration described above, and I noticed significant ambiguity in the linking results. Specifically, annotating approximately 22 million sentences in the CORD-19 dataset results in 113 million candidate entity spans, which get linked to 166 million UMLS concepts, i.e., on average, each candidate span resolves to 1.5 UMLS concepts. However, the distribution is Zipfian, with approximately 46.87% entity spans resolving to a single concept, with a long tail of entity spans being linked to up to 67 UMLS concepts. 

In this post, I will describe a strategy to disambiguate the linked entities. Based on limited testing, this chooses the correct concept about 73% of the time. 

The strategy is based on the intuition that an ambiguously linked entity span is more likely to resolve to a concept that is closely related to concepts for the other non-ambiguously linked entity spans in the sentence. In other words, the best target label to choose for an ambiguous entity is the one that is semantically closest to the labels of other entities in the sentence. Or even more succintly, and with apologies to John Firth, an entity is known by the company it keeps. 

The NER and NEL processes provided by the SciSpacy library allows us to reduce a sentence to a collection of entity spans, each of which map to zero or more UMLS concepts. Each UMLS concept maps to one or more Semantic Types, which represent high level subject categories. So essentially, a sentence can be reduced to a graph of semantic type using the following steps. 

Consider the sentence below, the NER step identifies candidate spans that are indicated by highlights.
The fact that viral antigens could not be demonstrated with the used staining is not the result of antibodies present in the cat that already bound to these antigens and hinder binding of other antibodies.
The NEL step will attempt to match these spans against the UMLS ontology. Results for the matching are shown below. As noted earlier, each UMLS concept maps to one or more sematic types, and these are shown here as well.
   
Entity-ID Entity Span Concept-ID Concept Primary Name Semantic Type Code Semantic Type Name
1 staining C0487602 Staining method T059 Laboratory Procedure
2 antibodies C0003241 Antibodies T116 Amino Acid, Peptide, or Protein
T129 Immunologic Factor
3 cat C0007450 Felis catus T015 Mammal
C0008169 Chloramphenicol O-Acetyltransferase T116 Amino Acid, Peptide, or Protein
T126 Enzyme
C0325089 Family Felidae T015 Mammal
C1366498 Chloramphenicol Acetyl Transferase Gene T028 Gene or Genome
4 antigens C0003320 Antigens T129 Immunologic Factor
5 binding C1145667 Binding action T052 Activity
C1167622 Binding (Molecular Function) T044 Molecular Function
6 antibodies C0003241 Antibodies T116 Amino Acid, Peptide, or Protein
T129 Immunologic Factor

The sequence of entity spans, each mapped to one or more semantic type codes can be represented by a graph of semantic type nodes as shown below. Here, each vertical grouping corresponds to an entity position. The BOS node is a special node representing the beginning of the sequence. Based on our intuition above, entity disambiguation is now just a matter of finding the most likely path through the graph.



Of course, "most likely" implies that we need to know the probabilities for transitioning between semantic types. We can think of the graph as a Markov Chain, and consider the probability of each node in the graph as being determined only by its previous node. Fortunately, this information is already available as a result of the NER + NEL process for the entire CORD-19 dataset, where approximately half of the entity spans mapped unambiguously to a single UMLS concept. Most concepts map to a single semantic type, but in cases where they map to multiple, we consider them as separate records. We compute pairwise transition probabilities across semantic types for these unambiguously linked pairs across the CORD-19 dataset and create our transition matrix. In addition, we also create a matrix of emission probabilities that identify the probabilities of resolving to a concept given a semantic type. 

Using the transition probabilities, we can traverse each path in the graph from starting to ending position, computing the path probability as the product of transition probabilities (or for computational reasons, the sum of log-probabilities) of the edges. However, better methods exist, such as the Viterbi algorithm, which allows us to save on repeated computation of common edge sequences across multiple paths. This is what we used to compute the most likely path through our semantic type graph. 

The Viterbi algorithm consists of two phases -- forward and backward. In the forward phase, we move left to right, computing the log-probability of each transition at each step, as shown by the vectors below each position in the figure. When computing the transition from multiple nodes to a single node (such as the one from [T129, T116] to [T126], we compute for both paths and choose the maximum value. 

In the backward phase, we move from right to left, choosing the maximum probability node at each step. This is shown in the figure as boxed entries. We can then lookup the appropriate semantic type and return the most likely sequence of semantic types (shown in bold in the bottom of the figure). 

However, our objective is to return disambiguated concept linkages for entities. Given a disambiguated semantic type and multiple possibilities indicated by SciSpacy's linking process, we use the emission probabilities to choose the most likely concept to apply at the position. The result for our example is shown in the table below.

Entity-ID Entity Span Concept-ID Concept Primary Name Semantic Type Code Semantic Type Name Correct?
1 staining C0487602 Staining method T059 Laboratory Procedure N/A*
2 antibodies C0003241 Antibodies T116 Amino Acid, Peptide, or Protein Yes
3 cat C0008169 Chloramphenicol O-Acetyltransferase T116 Amino Acid, Peptide, or Protein No
4 antigens C0003320 Antigens T129 Immunologic Factor N/A*
5 binding C1145667 Binding action T052 Activity Yes
6 antibodies C0003241 Antibodies T116 Amino Acid, Peptide, or Protein Yes
(N/A: non-ambiguous mappings) 

I thought this might be an interesting technique to share, hence writing about it. In addition, in the spirit of reproducibility, I have also provided the following artifacts for your convenience.
  1. Code: This github gist contains code that illustrates NER + NEL on an input sentence using SciSpacy and its UMLS integration, and then applies my adaptation of the Viterbi method (as described in this post) to disambiguate ambiguous entity linkages.
  2. Data: I have also provided the transition and emission matrices, and their associated lookup tables, for convenience, as these can be time consuming to generate from scratch from the CORD-19 dataset.
As always, I appreciate your feedback. Please let me know if you find flaws with my approach, and/or you know of a better approach for entity disambiguation

Sunday, June 14, 2020

Dask, map_partitions, and almost Embarassingly Parallel Processes


I have recently started using Dask for a new project. Dask is a Python library for parallel computing, similar to Apache Spark. Dask allows you to write parallel code to take advantage of multiple CPUs on your laptop, or multiple worker nodes in a cluster, with little or no change to the code. Up until a few months ago, I had heard of Dask, but I didn't really know what it was about. That changed when the folks at SaturnCloud offered me a chance to evaluate their platform a couple of months ago, with a view to see if the platform would be interesting enough for me to recommend to my employer. SaturnCloud's platform provides a notebook interface on top of Dask clusters, much like Databricks provides a notebook environment over Spark clusters. While I was personally quite impressed by the platform, we are long time users of Databricks, and we have built up a lot of expertise (and software) with it as a company. In addition, even though we have many Python users who use PySpark on our Databricks platform, we also have a significant number of users who prefer Scala or Java. So it wouldn't have been a good match for us.

I spent a about a week, on and off, on their platform, trying to replicate a small algorithm I had recently built for our Databricks platform, and I found the platform quite intuitive and easy to use, and not very different from working with Databricks and Jupyter notebooks. In order to learn all about Dask, I used the book Data Science with Python and Dask by Jesse C. Daniel. Probably because of its focus on Data Scientists, the book focuses almost exclusively on the Dask Dataframe API, which is just one of the four high level APIs (Array, Bag, DataFrame, and ML) and two low level APIs (Delayed and Futures) offered by Dask, as shown on the architecture diagram in the blog post Introduction to Dask: Insights on NYC Parking large dataset using Dask by Shubham Goel. In any case, the book is a good starting point if you want to start using Dask, although your pipelines (like mine) might be a bit DataFrame centric in the beginning, until you figure out other approaches.

Although I was no longer evaluating SaturnCloud, I found Dask to be really cool, and I decided to learn more about it by using it in an upcoming project. The project was to annotate documents in the CORD-19 Dataset using third-party annotations from Termite NER engine from SciBite Labs and SciSpacy UMLS model from AllenAI for search and NLP use. The first set of annotations are in the form of JSON annotations built into the original CORD-19 dataset, and the second is in the form of a SciSpacy NER model with a two step candidate generation and entity linkage process. In both cases we are working on individual documents in a corpus, so you would assume that the task would be embarassingly parallel, and a great fit for a parallel processing environment such as Dask.

The interesting things is that, without exception, all the pipelines I have built so far in this project are almost, but not quite, embarassingly parallel. The main problem that prevents us from having pure embarassingly parallel processes are performance issues around storage components in the pipeline. In my case, the two storage components are a Solr index and a PostgreSQL database. While it is possible to issue commits with every record in both cases, it will slow down the processing drastically. The other option, waiting for the process to finish before committing, is also not practical. The other problem is that large pre-trained ML models tend to take time to load into memory before they can be used, so it is not practical to load the model up once per row either. A solution to both problems is the Dask DataFrame map_partitions call. Like the one in Spark, it allows you to declare a block of code that is executed before and after each partition of data. In this post, I will describe some of my use cases and how I used Dask DataFrame's map_partitions to handle them.

So, just as background, Dask splits up an input DataFrame into partitions, and assigns them to workers in the Dask cluster for processing. The map_partitions call allows you to specify a handler that would act on each partition. By default, it would just execute the operations you specified on each row in the partition. A typical calling sequence with map_partitions would look something like this.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import dask.dataframe as dd
import dask.bag as db

def handle_row(row, ...):
    # do something with row
    return result

def handle_partition(part):
    # add partition level setup code here
    result = part.apply(lambda row: handle_row(row, ...), axis=1)
    # add partition level teardown code here
    return result

df = dd.read_csv("...")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #1: Loading an index from CSV and JSON

In this recipe, the CORD-19 dataset (April 2020) is provided as a combination of a CSV metadata file and a corpus of about 80,000 JSON files split into multiple subdirectories. The idea is to read the metadata file as a Dask DataFrame, then for each row, locate the JSON file and parse out the text and other metadata from it. The combination of fields in the metadata row and the fields extracted from the JSON file are written to a Solr index. Periodically, we commit the rows written to the Solr index.

The (pseudo) code below shows the use of map_partitions as a convenient way to group the records into a set of "commit-units".

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def handle_row(row):
    meta_fields = extract_metadata(row)
    content_fields = parse_file(row.filename)
    index_fields = merge_fields(meta_fields, content_fields)
    write_to_solr(index_fields)

def handle_partition(part):
    result = part.apply(lambda row: handle_row(row), axis=1)
    commit_solr()
    return result

df = dd.read_csv("metadata.csv")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #2: reading JSON, writing to DB

The second recipe involves reading the annotations provided by SciBiteLabs and storing them into a database table. The annotations are from their Termite annotation system, and identify entities such as genes, proteins, drugs, human phenotypes (indications), etc. The annotations are embedded inside the original JSON files provided by the CORD-19 dataset. Unfortunately, the release schedules seem to be slightly different, so the annotations (I used version 1.2) files did not match the CORD-19 files list. So I ran my Dask pipeline against the files themselves, generating a file list and creating a Dask Bag, then mapping to create a JSON row suitable for converting to a Dask DataFrame. My map_partitions each partition to a function that creates a database connection, and sends to another function that parses the annotations out of the JSON file and writes them out to the database, using the filename as the key. On returning to the handle_partition function after processing each row in the partition, the connection is committed and closed.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def handle_row(row, conn):
    annotations = extract_annotations(row.filepath)
    insert_annotations_to_db(annotations, conn)
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()

filepaths = []
for filepath in glob.iglob("CORD19/**/*.json", recursive=True):
    filepaths.append(filepath)

df = (db.from_sequence(filepaths, partition_size=100)
      .map(lambda fp: { "filepath": fp })
      .to_dataframe())
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #3: sentence splitting, writing to DB

In this recipe, I want to generate sentences out of each document text using the Sentence Segmentation functionality in the spaCy English model. Documents are provided in JSON format, so we will read our CSV file of metadata, use the filepath to locate the file, parse it, and extract the body, which we then pass to the sentence splitter. Output sentences are written to the database. Here, we will use our map_partitions hook for two things -- to open and close the database connection, as well as instantiate the Spacy English model. We have already seen the database connection in Recipe #2, so no surprises there.

The problem with specifying the English model at the partition level is that it needs to load into memory which takes time, and a fair amount of memory. So it is not really feasible to do this. The first thing I tried was to make the model size smaller. Since the Sentence Segmenter uses only the parser component, I disabled the tagger and NER components, but that didn't help too much, the pipeline would hang or crash within few minutes of starting up. I also learned that the sentence segmenter has an 1MB input size limit, and that there were quite a few files that were larger. So I added some chunking logic, and changed the model call to use batching (nlp.pipe instead of nlp), so that chunks will be segmented in parallel. In order to make it work, I first moved the Sentence Segmentation component into its own server using Flask (and later Gunicorn). This lasted longer, but would inexplicably crash after processing 30-40% of the texts. I initially suspected that the client was overwhelming the server, so I switched to multiple workers using Gunicorn and using request.Session to reuse the connection, but that didn't help either. Ultimately I didn't end up using this technique for this recipe, so I will cover these details in Recipe #5, where I did use it.

Ultimately I was able to load the model per worker rather than by partition using the technique described in this comment. I was able to run this much longer than previously but I still couldn't finish the job. Ultimately, because I was running out of time with all the failed starts, I settled for doing multiple partial jobs, where I would remove the documents that had been split already and rerun the job. I ended up with approximately 22M sentences from the corpus.

The code for this is shown below. Note that the ProgressBar has been replaced by a call to progress, since get_workers is part of the Dask distributed library, and the local diagnostics ProgressBar class no longer works.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def handle_row(row, conn, nlp):
    text = read_file(row.filepath)
    if len(text) > 1000000:
        texts = chunk(text)
    else:
        texts = [text]
    sents = nlp.pipe(texts)
    save_to_db(row.filepath, sents, conn)
    return 0

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_web_sm", disable=["tagger", "ner"])
        worker.nlp = nlp
    result = part.apply(lambda row: handle_row(row, conn, nlp), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("metadata.csv")
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #4: annotating sentence with UMLS candidate spans, writing to DB

This is similar to Recipe #3 in the sense that we read a directory of CSV files, each file containing approximately 5000 sentences, into a Dask DataFrame, load the SciSpacy model (en_core_sci_md) to find candidate spans that match biomedical entities in the Unified Medical Language System (UMLS) Metathesaurus. Matches are written out to the database. As with Recipe #3, the database connection is opened and closed per partition, and the model set up per worker. However, unlike Recipe #3, the handle_partition function does not delegate to the handle_row, instead it breaks up the rows in the partition into individual batches and operates on them in batches. Also notice that we are committing per batch rather than per partition. I find this kind of flexibility to be one of the coolest things about Dask. This pipeline produced slightly under 113M candidate entities.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def handle_batch(batch, conn, nlp):
    docs = nlp.pipe([b[2] for b in batch])
    for i, doc in enumerate(docs):
        doc_id, sent_id = batch[i][0], batch[i][1]
        for ent_id, ent in enumerate(doc.ents):
            save_to_db((doc_id, sent_id, ent_id, ent), conn)
    conn.commit()

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_sci_md", disable=["tagger", "parser"])
        worker.nlp = nlp
    result, batch = [], []
    for _, row in part.iterrows():
        if len(batch) % batch_size == 0 and len(batch) > 0:
            batch_results = handle_batch(batch, conn, nlp)
            result.append(batch_results)
            batch = []
        batch.append((row.doc_id, row.sent_id, row.sent_text))
    if len(batch) > 0:
        batch_results = handle_batch(batch, conn, nlp)
        result.append(batch_results)
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #5: resolving candidate spans against UMLS, writing to DB

The final recipe I would like to share in this post reads the sentences from the directory of sentence files, then for each partition of sentences, it extracts the candidate entities and attempts to link it to an entity from the UMLS Metathesaurus. The UMLS concept linked to the candidate entities are written back to the database. The concept and semantic type (a sort of classification hierarchy of concepts) metadata are also written out to separate tables in a normalized manner. As you can see, the sentences (doc_id, sent_id) only act as a starting point to group some database computations, so it might have been better to use dd.read_sql() instead, but that requires a single column primary key which I didn't have.

The UMLS dictionary is called the UMLS Knowledge Base and is about 0.7MB in size. Loading it once per worker reliably caused the pipeline to crash with messages that point to an out of memory situation. So at this point, I figured that my only option would be to have this run in its own server and have my pipeline consume it over HTTP. That would allow me to have more workers on the Dask side as well. My theory about my previous failures with using this setup during sentence splitting was that it was somehow being caused by large POST payloads or the server running out of memory because of excessively large batches. Since my input sizes (text spans) were more consistent this time around, I had more confidence that it would work, and it did.

A few tidbits of information around the server setup. I used Flask to load the UMLS Knowledge Base and exposed an HTTP POST API that took a batch of candidate spans and returned the associated concepts along with their metadata. I serve this through Gunicorn with 4 worker threads (see this tutorial for details), so that introduces some degree of redundancy. Gunicorn also monitors the workers so it will restart a worker if it fails. For debugging purposes, I also send the doc_id, sent_id, and ent_id as GET parameters so you can see them on the access log.

On the client side, I call the service using a Session, which allows me some degree of connection reuse. This is useful since my pipeline is going to be hammering away at the server for the next 30 hours. In case a request encounters a server error, it sleeps for a second before trying again, so as to give the server some breathing room to repair a worker if it dies, for example. Here is the code (client side, the server side around parsing the request and returning the response is fairly trivial, and the linking code is based heavily on the code in this SciSpacy Entity Linking test case).

With these changes, my only reason to use the map_partitions hook is to open and close the connection to the database. The code ended up marking up my 113M candidate entities with approximately 166M concepts (so approximately 1.5 annotations per candidate span), and approximately 120K unique UMLS concepts.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def handle_row(row, conn):
    headers = { "content-type" : "application/json" }
    params = {
        "doc_id": row.doc_id,
        "sent_id": row.sent_id
    }
    data = json.dumps([{"id": id, "text": text} for id, text in ent_spans])
    with requests.Session() as sess:
        resp = sess.post("http://path/to/server", headers=headers, params=params, data=data)
    except:
        time.sleep(1)
        return -1
    spans = parse_response(resp.json())
    save_links(spans)
    save_concept_metadata(spans)
    conn.commit()
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

I hope this was useful. I have used the Spark RDD map_partitions call in the past, which functions similarly, but for simpler use cases. The almost embarassingly parallel situation seems to be quite common, and map_partition seems to be an effective tool to deal with these situations. I figured these examples might be helpful, to illustrate various ways in which a pipeline can be designed to take advantage of map_partitions functionality, as well as spark ideas for more creative ones. Of course, as I worked my way through these use cases, I am beginning to understand the power of Dask and its other APIs as well. One other API that can be useful in this sort of situations is the low level Delayed API, which allows one to bypass the rigid call structure enforced by the DataFrame API. I hope to use that in the future.

Saturday, June 06, 2020

Bayesian Analysis with Python Exercise Solutions


In my quest to learn more about Bayesian Analysis, I spent the last few weeks working through the book Bayesian Analysis with Python - 2nd Edition by Osvaldo Martin. In their great wisdom, Amazon now only allows reviews by people who have been spending "real" money on Amazon, as opposed to those like me using "fake" money in the form of gift cards and credit card rewards. So I guess I will have to hold off posting my review of this great book until my fake money runs out. Meanwhile, you can read other people's review on Amazon. In this post, I would like to share my notebooks where I have worked out the exercises for Chapters 2-7 (there are 9 chapters, the first one is an introductory one, and the last two chapters cover areas only indirectly related to Bayesian Analysis, or more specifically, Bayesian analysis using Monte Carlo Markov Chain (MCMC) techniques. Here are my notebooks on Github. No guarantees, of course, since I am learning as well.


The author Osvaldo Martin is one of the developers of the ArviZ project, an exploratory analysis tool for Bayesian models. The ArviZ project was spun-off from the PyMC3 project, and many PyMC3 calls such as pm.traceplot() are actually calls to az.plot_trace() under the hood. According to the ArviZ website, it also supplies functionality for other Bayesian libraries, such as PyStan, Pyro, and TF Probability. Martin is also a PyMC3 maintainer, and the book reflects that. Compared with my previous foray into PyMC3, where I was trying to convert JAGS models taught in the two Coursera courses from the University of California, Santa Cruz (UCSC), into equivalent PyMC3 models using online documentations and other Internet resources, this time the focus was on exploring various code features offered by PyMC3 and ArviZ.

The book is billed as an introductory one on the PyMC3 site, but I thought it had good followup material for someone who has already taken the two UCSC Coursera courses. The focus on those courses was on the theory, to explain why you would choose one distribution over the other, linear and logistic regression, and some exposure to hierarchical and mixture models. This book contains a more code-heavy exposition of all these subjects, plus an introduction to Dirichlet processes as part of the Mixture models chapter, and an introduction to Gaussian processes.

Of course, I still have much more to learn about all this. Just today, I came across a link to Nicole Carlson's PyMC3 talk at PyData Chicago 2016, where she walks through some toy problems with PyMC2. She introduces the ideas of using Theano shared variables for input and output here, so you can use the model in a more Scikit-Learn like way, training with one dataset and evaluating with another. She also covers the newer PyMC3 Variational Inference API (ADVI), an alternative to its older MCMC API, and how to use the output of the ADVI model as input to an MCMC sampling model. In addition, she also describes serializing a PyMC3 model by pickling the trace and using it as a generative model. I also found the notebooks for her subsequent talk at PyData Chicago 2017, where she describes her work to wrap PyMC3 models in a more familiar Scikit-Learn like interface.

Another recommendation from the PyMC3 site, as well as from this book, was the "puppy book" Doing Bayesian Data Analysis by John Kruschke. Fortunately, that is available in my employer's online library, so I guess that is another book I won't buy from Amazon with fake or real money.

Thats pretty much all I had for today. Obviously the best way to learn this stuff is to try doing it yourself, but sharing these notebooks on Github in case someone gets stuck and can benefit from them. In case you do end up using them, please let me know what you think.

Saturday, May 09, 2020

Fun and Learn with Manning LiveProjects


The pandemic has forced most people indoors. With it, there has been a corresponding rise in online education companies offering courses to help you update your skills. Most of them follow the so-called "freemium" model, where you can watch the course videos and do the exercises, but if you want certification or support, you have to pay. In the past, I have aggressively taken advantage of these free offers, and have learned a lot in the process, so I am very grateful for the freemium model and hope it continues to exist, but nowadays I find myself being a bit more selective than I used to be. However, recently I came across a very interesting product being offered by Manning.com -- a "liveProject" on Discovering Disease Outbreaks from News Headlines, that promises to provide hands-on exposure to the consumer about Pandas, Scikit-Learn, text extraction, KMeans and DBScan clustering, as they do the project.

Although, in all fairness, while the idea is somewhat uncommon, it is not completely novel. Kaggle was there first, with their Beginner Datasets and Machine Learning Projects. However, there is one important difference -- a Manning liveProject is broken into steps, each of which has high level instructions on the prescribed approach to solve that step, but supplemented by educational material excerpted from one of Manning's books. I thought that it was a really cool idea to repurposing existing content and opening it up to a potentially different demographic. In that sense, it reminds me of the Google Places API, created by combining maps that powered Google maps and the location feedback from consumers using it.

In any case, the project setup is to discover one or more disease outbreaks from newspaper headlines collected over some time frame, and plot them on a map to discover clusters. If the cluster is over multiple geographical areas, it can be classified as a pandemic. I signed up primarily because (a) most of the clustering I have done so far involve topics and terms in text, so geographic clustering seemed new and cool to me, and (b) my son is an aspiring data scientist, and I figured that maybe we could do a bit of pair programming and learn together. However, the project turned out to be quite interesting and I got sucked in, and I ended up optimizing for (a) more than for (b). Oh well :-).

I forked the project template provided by one of the instructors, and implemented the steps of the project as Jupyter notebooks, and finally wrote up my project report (mandatory deliverable for the liveProject) as the README.md file for my fork. Steps are listed under the Methods section. At a high level, the transition from a list of newspaper headlines to disease clusters on a map (World and US) involved the following steps:

The project provides around 650 news paper headlines captured from various news agencies over an unspecified time interval, so it reflects the state of the world for some snapshot. We tag the country and city in the headlines using regular expression. Specifically, we build regexes out of the list of countries and cities in the GeoNamesCache library, and run them against the headlines, capturing the city and country names found in each headline. Of the 650 headlines, 634 could be fully resolved with both country and city names, 1 with only country name, and 15 for which neither country nor city could be found. The resolved city and country names are used to look up the latitude and longitude coordinates for each of the 634 cities, again using the GeoNamesCache. The other headlines are dropped from further analysis.

The coordinates of the cities are then plotted on a world map (Figure 1), and it looks like there are disease outbreaks all over the place during that time frame. Note that the project also additionally asks to look specifically at the United States, but in order to keep the blog post short, we don't talk about it here. But you can find those visualizations in the notebooks.


Clustering them using the K-Means algorithm helps somewhat, but basically clusters the points by longitude -- the first cluster is the Americas, the second is Europe, Africa and West Asia, and the third is South Asia and Australia.


Clustering the headlines the density based method DBSCAN produces more fine grained clusters.


The distance measure used in the clustering above was standard Euclidean distance, which is more suitable for a flat earth. For a spherical earth, a better distance measure would be the Great Circle Distance. Using that distance measure, and standard hyperparameters for DBSCAN, we get a cluster which is even more fine grained.


At this point, it looks like most of the United States and Western Europe is afflicted by one major disease or another. Given that the visualizations clearly indicated disease clusters, we wanted to find if these were all about the same disease or different diseases. We then extracted and manually looked at the "most representative" newspaper headlines (i.e., headlines that had coordinates closest to the centroids of each cluster), looking for readily identifiable diseases, then looking at the surrounding words, then using these words to look for more diseases. Using this strategy, we were able to get a count of headlines for each disease. It turned out that even though different diseases were being talked about, the dominant one was the Zika virus.

So, we filtered out the newspaper headlines for the Zika virus (around 200 of them), and reclustered them using their latitude and longitude using DBSCAN and the Great Circle Distance, and we got this.


Based on this visualization, we see that the biggest outbreak seems to be in the central part of the Americas, with two big clusters in Southern United States, Mexico, and Ecuador in South America. There is also a significant cluster in South East Asia, and one in North India, and smaller outbreaks in Western Asia. Since our pretend client is the World Health Organization (WHO), we are supposed to make a recommendation, and the recommendation is that this is a pandemic since the outbreak is across multiple countries.

This was a fun exercise, and I learned about map based clustering and visualization, which was relatively new to me, since I had never used it before. I think the liveProject idea is very powerful and has lots of potential. If you are curious about the code, the notebooks are here.