Sunday, June 14, 2020

Dask, map_partitions, and almost Embarassingly Parallel Processes


I have recently started using Dask for a new project. Dask is a Python library for parallel computing, similar to Apache Spark. Dask allows you to write parallel code to take advantage of multiple CPUs on your laptop, or multiple worker nodes in a cluster, with little or no change to the code. Up until a few months ago, I had heard of Dask, but I didn't really know what it was about. That changed when the folks at SaturnCloud offered me a chance to evaluate their platform a couple of months ago, with a view to see if the platform would be interesting enough for me to recommend to my employer. SaturnCloud's platform provides a notebook interface on top of Dask clusters, much like Databricks provides a notebook environment over Spark clusters. While I was personally quite impressed by the platform, we are long time users of Databricks, and we have built up a lot of expertise (and software) with it as a company. In addition, even though we have many Python users who use PySpark on our Databricks platform, we also have a significant number of users who prefer Scala or Java. So it wouldn't have been a good match for us.

I spent a about a week, on and off, on their platform, trying to replicate a small algorithm I had recently built for our Databricks platform, and I found the platform quite intuitive and easy to use, and not very different from working with Databricks and Jupyter notebooks. In order to learn all about Dask, I used the book Data Science with Python and Dask by Jesse C. Daniel. Probably because of its focus on Data Scientists, the book focuses almost exclusively on the Dask Dataframe API, which is just one of the four high level APIs (Array, Bag, DataFrame, and ML) and two low level APIs (Delayed and Futures) offered by Dask, as shown on the architecture diagram in the blog post Introduction to Dask: Insights on NYC Parking large dataset using Dask by Shubham Goel. In any case, the book is a good starting point if you want to start using Dask, although your pipelines (like mine) might be a bit DataFrame centric in the beginning, until you figure out other approaches.

Although I was no longer evaluating SaturnCloud, I found Dask to be really cool, and I decided to learn more about it by using it in an upcoming project. The project was to annotate documents in the CORD-19 Dataset using third-party annotations from Termite NER engine from SciBite Labs and SciSpacy UMLS model from AllenAI for search and NLP use. The first set of annotations are in the form of JSON annotations built into the original CORD-19 dataset, and the second is in the form of a SciSpacy NER model with a two step candidate generation and entity linkage process. In both cases we are working on individual documents in a corpus, so you would assume that the task would be embarassingly parallel, and a great fit for a parallel processing environment such as Dask.

The interesting things is that, without exception, all the pipelines I have built so far in this project are almost, but not quite, embarassingly parallel. The main problem that prevents us from having pure embarassingly parallel processes are performance issues around storage components in the pipeline. In my case, the two storage components are a Solr index and a PostgreSQL database. While it is possible to issue commits with every record in both cases, it will slow down the processing drastically. The other option, waiting for the process to finish before committing, is also not practical. The other problem is that large pre-trained ML models tend to take time to load into memory before they can be used, so it is not practical to load the model up once per row either. A solution to both problems is the Dask DataFrame map_partitions call. Like the one in Spark, it allows you to declare a block of code that is executed before and after each partition of data. In this post, I will describe some of my use cases and how I used Dask DataFrame's map_partitions to handle them.

So, just as background, Dask splits up an input DataFrame into partitions, and assigns them to workers in the Dask cluster for processing. The map_partitions call allows you to specify a handler that would act on each partition. By default, it would just execute the operations you specified on each row in the partition. A typical calling sequence with map_partitions would look something like this.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import dask.dataframe as dd
import dask.bag as db

def handle_row(row, ...):
    # do something with row
    return result

def handle_partition(part):
    # add partition level setup code here
    result = part.apply(lambda row: handle_row(row, ...), axis=1)
    # add partition level teardown code here
    return result

df = dd.read_csv("...")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #1: Loading an index from CSV and JSON

In this recipe, the CORD-19 dataset (April 2020) is provided as a combination of a CSV metadata file and a corpus of about 80,000 JSON files split into multiple subdirectories. The idea is to read the metadata file as a Dask DataFrame, then for each row, locate the JSON file and parse out the text and other metadata from it. The combination of fields in the metadata row and the fields extracted from the JSON file are written to a Solr index. Periodically, we commit the rows written to the Solr index.

The (pseudo) code below shows the use of map_partitions as a convenient way to group the records into a set of "commit-units".

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def handle_row(row):
    meta_fields = extract_metadata(row)
    content_fields = parse_file(row.filename)
    index_fields = merge_fields(meta_fields, content_fields)
    write_to_solr(index_fields)

def handle_partition(part):
    result = part.apply(lambda row: handle_row(row), axis=1)
    commit_solr()
    return result

df = dd.read_csv("metadata.csv")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #2: reading JSON, writing to DB

The second recipe involves reading the annotations provided by SciBiteLabs and storing them into a database table. The annotations are from their Termite annotation system, and identify entities such as genes, proteins, drugs, human phenotypes (indications), etc. The annotations are embedded inside the original JSON files provided by the CORD-19 dataset. Unfortunately, the release schedules seem to be slightly different, so the annotations (I used version 1.2) files did not match the CORD-19 files list. So I ran my Dask pipeline against the files themselves, generating a file list and creating a Dask Bag, then mapping to create a JSON row suitable for converting to a Dask DataFrame. My map_partitions each partition to a function that creates a database connection, and sends to another function that parses the annotations out of the JSON file and writes them out to the database, using the filename as the key. On returning to the handle_partition function after processing each row in the partition, the connection is committed and closed.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def handle_row(row, conn):
    annotations = extract_annotations(row.filepath)
    insert_annotations_to_db(annotations, conn)
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()

filepaths = []
for filepath in glob.iglob("CORD19/**/*.json", recursive=True):
    filepaths.append(filepath)

df = (db.from_sequence(filepaths, partition_size=100)
      .map(lambda fp: { "filepath": fp })
      .to_dataframe())
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #3: sentence splitting, writing to DB

In this recipe, I want to generate sentences out of each document text using the Sentence Segmentation functionality in the spaCy English model. Documents are provided in JSON format, so we will read our CSV file of metadata, use the filepath to locate the file, parse it, and extract the body, which we then pass to the sentence splitter. Output sentences are written to the database. Here, we will use our map_partitions hook for two things -- to open and close the database connection, as well as instantiate the Spacy English model. We have already seen the database connection in Recipe #2, so no surprises there.

The problem with specifying the English model at the partition level is that it needs to load into memory which takes time, and a fair amount of memory. So it is not really feasible to do this. The first thing I tried was to make the model size smaller. Since the Sentence Segmenter uses only the parser component, I disabled the tagger and NER components, but that didn't help too much, the pipeline would hang or crash within few minutes of starting up. I also learned that the sentence segmenter has an 1MB input size limit, and that there were quite a few files that were larger. So I added some chunking logic, and changed the model call to use batching (nlp.pipe instead of nlp), so that chunks will be segmented in parallel. In order to make it work, I first moved the Sentence Segmentation component into its own server using Flask (and later Gunicorn). This lasted longer, but would inexplicably crash after processing 30-40% of the texts. I initially suspected that the client was overwhelming the server, so I switched to multiple workers using Gunicorn and using request.Session to reuse the connection, but that didn't help either. Ultimately I didn't end up using this technique for this recipe, so I will cover these details in Recipe #5, where I did use it.

Ultimately I was able to load the model per worker rather than by partition using the technique described in this comment. I was able to run this much longer than previously but I still couldn't finish the job. Ultimately, because I was running out of time with all the failed starts, I settled for doing multiple partial jobs, where I would remove the documents that had been split already and rerun the job. I ended up with approximately 22M sentences from the corpus.

The code for this is shown below. Note that the ProgressBar has been replaced by a call to progress, since get_workers is part of the Dask distributed library, and the local diagnostics ProgressBar class no longer works.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def handle_row(row, conn, nlp):
    text = read_file(row.filepath)
    if len(text) > 1000000:
        texts = chunk(text)
    else:
        texts = [text]
    sents = nlp.pipe(texts)
    save_to_db(row.filepath, sents, conn)
    return 0

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_web_sm", disable=["tagger", "ner"])
        worker.nlp = nlp
    result = part.apply(lambda row: handle_row(row, conn, nlp), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("metadata.csv")
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #4: annotating sentence with UMLS candidate spans, writing to DB

This is similar to Recipe #3 in the sense that we read a directory of CSV files, each file containing approximately 5000 sentences, into a Dask DataFrame, load the SciSpacy model (en_core_sci_md) to find candidate spans that match biomedical entities in the Unified Medical Language System (UMLS) Metathesaurus. Matches are written out to the database. As with Recipe #3, the database connection is opened and closed per partition, and the model set up per worker. However, unlike Recipe #3, the handle_partition function does not delegate to the handle_row, instead it breaks up the rows in the partition into individual batches and operates on them in batches. Also notice that we are committing per batch rather than per partition. I find this kind of flexibility to be one of the coolest things about Dask. This pipeline produced slightly under 113M candidate entities.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def handle_batch(batch, conn, nlp):
    docs = nlp.pipe([b[2] for b in batch])
    for i, doc in enumerate(docs):
        doc_id, sent_id = batch[i][0], batch[i][1]
        for ent_id, ent in enumerate(doc.ents):
            save_to_db((doc_id, sent_id, ent_id, ent), conn)
    conn.commit()

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_sci_md", disable=["tagger", "parser"])
        worker.nlp = nlp
    result, batch = [], []
    for _, row in part.iterrows():
        if len(batch) % batch_size == 0 and len(batch) > 0:
            batch_results = handle_batch(batch, conn, nlp)
            result.append(batch_results)
            batch = []
        batch.append((row.doc_id, row.sent_id, row.sent_text))
    if len(batch) > 0:
        batch_results = handle_batch(batch, conn, nlp)
        result.append(batch_results)
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #5: resolving candidate spans against UMLS, writing to DB

The final recipe I would like to share in this post reads the sentences from the directory of sentence files, then for each partition of sentences, it extracts the candidate entities and attempts to link it to an entity from the UMLS Metathesaurus. The UMLS concept linked to the candidate entities are written back to the database. The concept and semantic type (a sort of classification hierarchy of concepts) metadata are also written out to separate tables in a normalized manner. As you can see, the sentences (doc_id, sent_id) only act as a starting point to group some database computations, so it might have been better to use dd.read_sql() instead, but that requires a single column primary key which I didn't have.

The UMLS dictionary is called the UMLS Knowledge Base and is about 0.7MB in size. Loading it once per worker reliably caused the pipeline to crash with messages that point to an out of memory situation. So at this point, I figured that my only option would be to have this run in its own server and have my pipeline consume it over HTTP. That would allow me to have more workers on the Dask side as well. My theory about my previous failures with using this setup during sentence splitting was that it was somehow being caused by large POST payloads or the server running out of memory because of excessively large batches. Since my input sizes (text spans) were more consistent this time around, I had more confidence that it would work, and it did.

A few tidbits of information around the server setup. I used Flask to load the UMLS Knowledge Base and exposed an HTTP POST API that took a batch of candidate spans and returned the associated concepts along with their metadata. I serve this through Gunicorn with 4 worker threads (see this tutorial for details), so that introduces some degree of redundancy. Gunicorn also monitors the workers so it will restart a worker if it fails. For debugging purposes, I also send the doc_id, sent_id, and ent_id as GET parameters so you can see them on the access log.

On the client side, I call the service using a Session, which allows me some degree of connection reuse. This is useful since my pipeline is going to be hammering away at the server for the next 30 hours. In case a request encounters a server error, it sleeps for a second before trying again, so as to give the server some breathing room to repair a worker if it dies, for example. Here is the code (client side, the server side around parsing the request and returning the response is fairly trivial, and the linking code is based heavily on the code in this SciSpacy Entity Linking test case).

With these changes, my only reason to use the map_partitions hook is to open and close the connection to the database. The code ended up marking up my 113M candidate entities with approximately 166M concepts (so approximately 1.5 annotations per candidate span), and approximately 120K unique UMLS concepts.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def handle_row(row, conn):
    headers = { "content-type" : "application/json" }
    params = {
        "doc_id": row.doc_id,
        "sent_id": row.sent_id
    }
    data = json.dumps([{"id": id, "text": text} for id, text in ent_spans])
    with requests.Session() as sess:
        resp = sess.post("http://path/to/server", headers=headers, params=params, data=data)
    except:
        time.sleep(1)
        return -1
    spans = parse_response(resp.json())
    save_links(spans)
    save_concept_metadata(spans)
    conn.commit()
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

I hope this was useful. I have used the Spark RDD map_partitions call in the past, which functions similarly, but for simpler use cases. The almost embarassingly parallel situation seems to be quite common, and map_partition seems to be an effective tool to deal with these situations. I figured these examples might be helpful, to illustrate various ways in which a pipeline can be designed to take advantage of map_partitions functionality, as well as spark ideas for more creative ones. Of course, as I worked my way through these use cases, I am beginning to understand the power of Dask and its other APIs as well. One other API that can be useful in this sort of situations is the low level Delayed API, which allows one to bypass the rigid call structure enforced by the DataFrame API. I hope to use that in the future.

Saturday, June 06, 2020

Bayesian Analysis with Python Exercise Solutions


In my quest to learn more about Bayesian Analysis, I spent the last few weeks working through the book Bayesian Analysis with Python - 2nd Edition by Osvaldo Martin. In their great wisdom, Amazon now only allows reviews by people who have been spending "real" money on Amazon, as opposed to those like me using "fake" money in the form of gift cards and credit card rewards. So I guess I will have to hold off posting my review of this great book until my fake money runs out. Meanwhile, you can read other people's review on Amazon. In this post, I would like to share my notebooks where I have worked out the exercises for Chapters 2-7 (there are 9 chapters, the first one is an introductory one, and the last two chapters cover areas only indirectly related to Bayesian Analysis, or more specifically, Bayesian analysis using Monte Carlo Markov Chain (MCMC) techniques. Here are my notebooks on Github. No guarantees, of course, since I am learning as well.


The author Osvaldo Martin is one of the developers of the ArviZ project, an exploratory analysis tool for Bayesian models. The ArviZ project was spun-off from the PyMC3 project, and many PyMC3 calls such as pm.traceplot() are actually calls to az.plot_trace() under the hood. According to the ArviZ website, it also supplies functionality for other Bayesian libraries, such as PyStan, Pyro, and TF Probability. Martin is also a PyMC3 maintainer, and the book reflects that. Compared with my previous foray into PyMC3, where I was trying to convert JAGS models taught in the two Coursera courses from the University of California, Santa Cruz (UCSC), into equivalent PyMC3 models using online documentations and other Internet resources, this time the focus was on exploring various code features offered by PyMC3 and ArviZ.

The book is billed as an introductory one on the PyMC3 site, but I thought it had good followup material for someone who has already taken the two UCSC Coursera courses. The focus on those courses was on the theory, to explain why you would choose one distribution over the other, linear and logistic regression, and some exposure to hierarchical and mixture models. This book contains a more code-heavy exposition of all these subjects, plus an introduction to Dirichlet processes as part of the Mixture models chapter, and an introduction to Gaussian processes.

Of course, I still have much more to learn about all this. Just today, I came across a link to Nicole Carlson's PyMC3 talk at PyData Chicago 2016, where she walks through some toy problems with PyMC2. She introduces the ideas of using Theano shared variables for input and output here, so you can use the model in a more Scikit-Learn like way, training with one dataset and evaluating with another. She also covers the newer PyMC3 Variational Inference API (ADVI), an alternative to its older MCMC API, and how to use the output of the ADVI model as input to an MCMC sampling model. In addition, she also describes serializing a PyMC3 model by pickling the trace and using it as a generative model. I also found the notebooks for her subsequent talk at PyData Chicago 2017, where she describes her work to wrap PyMC3 models in a more familiar Scikit-Learn like interface.

Another recommendation from the PyMC3 site, as well as from this book, was the "puppy book" Doing Bayesian Data Analysis by John Kruschke. Fortunately, that is available in my employer's online library, so I guess that is another book I won't buy from Amazon with fake or real money.

Thats pretty much all I had for today. Obviously the best way to learn this stuff is to try doing it yourself, but sharing these notebooks on Github in case someone gets stuck and can benefit from them. In case you do end up using them, please let me know what you think.