Sunday, June 14, 2020

Dask, map_partitions, and almost Embarassingly Parallel Processes


I have recently started using Dask for a new project. Dask is a Python library for parallel computing, similar to Apache Spark. Dask allows you to write parallel code to take advantage of multiple CPUs on your laptop, or multiple worker nodes in a cluster, with little or no change to the code. Up until a few months ago, I had heard of Dask, but I didn't really know what it was about. That changed when the folks at SaturnCloud offered me a chance to evaluate their platform a couple of months ago, with a view to see if the platform would be interesting enough for me to recommend to my employer. SaturnCloud's platform provides a notebook interface on top of Dask clusters, much like Databricks provides a notebook environment over Spark clusters. While I was personally quite impressed by the platform, we are long time users of Databricks, and we have built up a lot of expertise (and software) with it as a company. In addition, even though we have many Python users who use PySpark on our Databricks platform, we also have a significant number of users who prefer Scala or Java. So it wouldn't have been a good match for us.

I spent a about a week, on and off, on their platform, trying to replicate a small algorithm I had recently built for our Databricks platform, and I found the platform quite intuitive and easy to use, and not very different from working with Databricks and Jupyter notebooks. In order to learn all about Dask, I used the book Data Science with Python and Dask by Jesse C. Daniel. Probably because of its focus on Data Scientists, the book focuses almost exclusively on the Dask Dataframe API, which is just one of the four high level APIs (Array, Bag, DataFrame, and ML) and two low level APIs (Delayed and Futures) offered by Dask, as shown on the architecture diagram in the blog post Introduction to Dask: Insights on NYC Parking large dataset using Dask by Shubham Goel. In any case, the book is a good starting point if you want to start using Dask, although your pipelines (like mine) might be a bit DataFrame centric in the beginning, until you figure out other approaches.

Although I was no longer evaluating SaturnCloud, I found Dask to be really cool, and I decided to learn more about it by using it in an upcoming project. The project was to annotate documents in the CORD-19 Dataset using third-party annotations from Termite NER engine from SciBite Labs and SciSpacy UMLS model from AllenAI for search and NLP use. The first set of annotations are in the form of JSON annotations built into the original CORD-19 dataset, and the second is in the form of a SciSpacy NER model with a two step candidate generation and entity linkage process. In both cases we are working on individual documents in a corpus, so you would assume that the task would be embarassingly parallel, and a great fit for a parallel processing environment such as Dask.

The interesting things is that, without exception, all the pipelines I have built so far in this project are almost, but not quite, embarassingly parallel. The main problem that prevents us from having pure embarassingly parallel processes are performance issues around storage components in the pipeline. In my case, the two storage components are a Solr index and a PostgreSQL database. While it is possible to issue commits with every record in both cases, it will slow down the processing drastically. The other option, waiting for the process to finish before committing, is also not practical. The other problem is that large pre-trained ML models tend to take time to load into memory before they can be used, so it is not practical to load the model up once per row either. A solution to both problems is the Dask DataFrame map_partitions call. Like the one in Spark, it allows you to declare a block of code that is executed before and after each partition of data. In this post, I will describe some of my use cases and how I used Dask DataFrame's map_partitions to handle them.

So, just as background, Dask splits up an input DataFrame into partitions, and assigns them to workers in the Dask cluster for processing. The map_partitions call allows you to specify a handler that would act on each partition. By default, it would just execute the operations you specified on each row in the partition. A typical calling sequence with map_partitions would look something like this.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import dask.dataframe as dd
import dask.bag as db

def handle_row(row, ...):
    # do something with row
    return result

def handle_partition(part):
    # add partition level setup code here
    result = part.apply(lambda row: handle_row(row, ...), axis=1)
    # add partition level teardown code here
    return result

df = dd.read_csv("...")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #1: Loading an index from CSV and JSON

In this recipe, the CORD-19 dataset (April 2020) is provided as a combination of a CSV metadata file and a corpus of about 80,000 JSON files split into multiple subdirectories. The idea is to read the metadata file as a Dask DataFrame, then for each row, locate the JSON file and parse out the text and other metadata from it. The combination of fields in the metadata row and the fields extracted from the JSON file are written to a Solr index. Periodically, we commit the rows written to the Solr index.

The (pseudo) code below shows the use of map_partitions as a convenient way to group the records into a set of "commit-units".

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def handle_row(row):
    meta_fields = extract_metadata(row)
    content_fields = parse_file(row.filename)
    index_fields = merge_fields(meta_fields, content_fields)
    write_to_solr(index_fields)

def handle_partition(part):
    result = part.apply(lambda row: handle_row(row), axis=1)
    commit_solr()
    return result

df = dd.read_csv("metadata.csv")
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #2: reading JSON, writing to DB

The second recipe involves reading the annotations provided by SciBiteLabs and storing them into a database table. The annotations are from their Termite annotation system, and identify entities such as genes, proteins, drugs, human phenotypes (indications), etc. The annotations are embedded inside the original JSON files provided by the CORD-19 dataset. Unfortunately, the release schedules seem to be slightly different, so the annotations (I used version 1.2) files did not match the CORD-19 files list. So I ran my Dask pipeline against the files themselves, generating a file list and creating a Dask Bag, then mapping to create a JSON row suitable for converting to a Dask DataFrame. My map_partitions each partition to a function that creates a database connection, and sends to another function that parses the annotations out of the JSON file and writes them out to the database, using the filename as the key. On returning to the handle_partition function after processing each row in the partition, the connection is committed and closed.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def handle_row(row, conn):
    annotations = extract_annotations(row.filepath)
    insert_annotations_to_db(annotations, conn)
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()

filepaths = []
for filepath in glob.iglob("CORD19/**/*.json", recursive=True):
    filepaths.append(filepath)

df = (db.from_sequence(filepaths, partition_size=100)
      .map(lambda fp: { "filepath": fp })
      .to_dataframe())
with ProgressBar():
    results = df.map_partitions(lambda part: handle_partition(part))
    results.compute()

Recipe #3: sentence splitting, writing to DB

In this recipe, I want to generate sentences out of each document text using the Sentence Segmentation functionality in the spaCy English model. Documents are provided in JSON format, so we will read our CSV file of metadata, use the filepath to locate the file, parse it, and extract the body, which we then pass to the sentence splitter. Output sentences are written to the database. Here, we will use our map_partitions hook for two things -- to open and close the database connection, as well as instantiate the Spacy English model. We have already seen the database connection in Recipe #2, so no surprises there.

The problem with specifying the English model at the partition level is that it needs to load into memory which takes time, and a fair amount of memory. So it is not really feasible to do this. The first thing I tried was to make the model size smaller. Since the Sentence Segmenter uses only the parser component, I disabled the tagger and NER components, but that didn't help too much, the pipeline would hang or crash within few minutes of starting up. I also learned that the sentence segmenter has an 1MB input size limit, and that there were quite a few files that were larger. So I added some chunking logic, and changed the model call to use batching (nlp.pipe instead of nlp), so that chunks will be segmented in parallel. In order to make it work, I first moved the Sentence Segmentation component into its own server using Flask (and later Gunicorn). This lasted longer, but would inexplicably crash after processing 30-40% of the texts. I initially suspected that the client was overwhelming the server, so I switched to multiple workers using Gunicorn and using request.Session to reuse the connection, but that didn't help either. Ultimately I didn't end up using this technique for this recipe, so I will cover these details in Recipe #5, where I did use it.

Ultimately I was able to load the model per worker rather than by partition using the technique described in this comment. I was able to run this much longer than previously but I still couldn't finish the job. Ultimately, because I was running out of time with all the failed starts, I settled for doing multiple partial jobs, where I would remove the documents that had been split already and rerun the job. I ended up with approximately 22M sentences from the corpus.

The code for this is shown below. Note that the ProgressBar has been replaced by a call to progress, since get_workers is part of the Dask distributed library, and the local diagnostics ProgressBar class no longer works.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def handle_row(row, conn, nlp):
    text = read_file(row.filepath)
    if len(text) > 1000000:
        texts = chunk(text)
    else:
        texts = [text]
    sents = nlp.pipe(texts)
    save_to_db(row.filepath, sents, conn)
    return 0

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_web_sm", disable=["tagger", "ner"])
        worker.nlp = nlp
    result = part.apply(lambda row: handle_row(row, conn, nlp), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("metadata.csv")
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #4: annotating sentence with UMLS candidate spans, writing to DB

This is similar to Recipe #3 in the sense that we read a directory of CSV files, each file containing approximately 5000 sentences, into a Dask DataFrame, load the SciSpacy model (en_core_sci_md) to find candidate spans that match biomedical entities in the Unified Medical Language System (UMLS) Metathesaurus. Matches are written out to the database. As with Recipe #3, the database connection is opened and closed per partition, and the model set up per worker. However, unlike Recipe #3, the handle_partition function does not delegate to the handle_row, instead it breaks up the rows in the partition into individual batches and operates on them in batches. Also notice that we are committing per batch rather than per partition. I find this kind of flexibility to be one of the coolest things about Dask. This pipeline produced slightly under 113M candidate entities.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def handle_batch(batch, conn, nlp):
    docs = nlp.pipe([b[2] for b in batch])
    for i, doc in enumerate(docs):
        doc_id, sent_id = batch[i][0], batch[i][1]
        for ent_id, ent in enumerate(doc.ents):
            save_to_db((doc_id, sent_id, ent_id, ent), conn)
    conn.commit()

def handle_partition(part):
    worker = get_worker()
    conn = connect_to_db()
    try:
        nlp = worker.nlp
    except:
        nlp = spacy.load("en_core_sci_md", disable=["tagger", "parser"])
        worker.nlp = nlp
    result, batch = [], []
    for _, row in part.iterrows():
        if len(batch) % batch_size == 0 and len(batch) > 0:
            batch_results = handle_batch(batch, conn, nlp)
            result.append(batch_results)
            batch = []
        batch.append((row.doc_id, row.sent_id, row.sent_text))
    if len(batch) > 0:
        batch_results = handle_batch(batch, conn, nlp)
        result.append(batch_results)
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

Recipe #5: resolving candidate spans against UMLS, writing to DB

The final recipe I would like to share in this post reads the sentences from the directory of sentence files, then for each partition of sentences, it extracts the candidate entities and attempts to link it to an entity from the UMLS Metathesaurus. The UMLS concept linked to the candidate entities are written back to the database. The concept and semantic type (a sort of classification hierarchy of concepts) metadata are also written out to separate tables in a normalized manner. As you can see, the sentences (doc_id, sent_id) only act as a starting point to group some database computations, so it might have been better to use dd.read_sql() instead, but that requires a single column primary key which I didn't have.

The UMLS dictionary is called the UMLS Knowledge Base and is about 0.7MB in size. Loading it once per worker reliably caused the pipeline to crash with messages that point to an out of memory situation. So at this point, I figured that my only option would be to have this run in its own server and have my pipeline consume it over HTTP. That would allow me to have more workers on the Dask side as well. My theory about my previous failures with using this setup during sentence splitting was that it was somehow being caused by large POST payloads or the server running out of memory because of excessively large batches. Since my input sizes (text spans) were more consistent this time around, I had more confidence that it would work, and it did.

A few tidbits of information around the server setup. I used Flask to load the UMLS Knowledge Base and exposed an HTTP POST API that took a batch of candidate spans and returned the associated concepts along with their metadata. I serve this through Gunicorn with 4 worker threads (see this tutorial for details), so that introduces some degree of redundancy. Gunicorn also monitors the workers so it will restart a worker if it fails. For debugging purposes, I also send the doc_id, sent_id, and ent_id as GET parameters so you can see them on the access log.

On the client side, I call the service using a Session, which allows me some degree of connection reuse. This is useful since my pipeline is going to be hammering away at the server for the next 30 hours. In case a request encounters a server error, it sleeps for a second before trying again, so as to give the server some breathing room to repair a worker if it dies, for example. Here is the code (client side, the server side around parsing the request and returning the response is fairly trivial, and the linking code is based heavily on the code in this SciSpacy Entity Linking test case).

With these changes, my only reason to use the map_partitions hook is to open and close the connection to the database. The code ended up marking up my 113M candidate entities with approximately 166M concepts (so approximately 1.5 annotations per candidate span), and approximately 120K unique UMLS concepts.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def handle_row(row, conn):
    headers = { "content-type" : "application/json" }
    params = {
        "doc_id": row.doc_id,
        "sent_id": row.sent_id
    }
    data = json.dumps([{"id": id, "text": text} for id, text in ent_spans])
    with requests.Session() as sess:
        resp = sess.post("http://path/to/server", headers=headers, params=params, data=data)
    except:
        time.sleep(1)
        return -1
    spans = parse_response(resp.json())
    save_links(spans)
    save_concept_metadata(spans)
    conn.commit()
    return 0

def handle_partition(part):
    conn = connect_to_db()
    result = part.apply(lambda row: handle_row(row, conn), axis=1)
    conn.commit()
    conn.close()
    return result

df = dd.read_csv("sentences/sents-*", names=["doc_id", "sent_id", "sent_text"])
results = df.map_partitions(lambda part: handle_partition(part))
results = results.persist()
progress(results)
results.compute()

I hope this was useful. I have used the Spark RDD map_partitions call in the past, which functions similarly, but for simpler use cases. The almost embarassingly parallel situation seems to be quite common, and map_partition seems to be an effective tool to deal with these situations. I figured these examples might be helpful, to illustrate various ways in which a pipeline can be designed to take advantage of map_partitions functionality, as well as spark ideas for more creative ones. Of course, as I worked my way through these use cases, I am beginning to understand the power of Dask and its other APIs as well. One other API that can be useful in this sort of situations is the low level Delayed API, which allows one to bypass the rigid call structure enforced by the DataFrame API. I hope to use that in the future.

2 comments (moderated to prevent spam):

Alex UK said...

Hello Sujit,
I used the same approach and kept running out of memory/time.
The challenge is that Kaggle doesn't provide engineering tooling to make processing on this scale easier: to make sure you are not re-processing entities using sets or cbloom filter inside Redis is very effective to use.
Also to span candidate CUI from UMLS it's very easy to create Aho-Corasick automation by parsing MRXW_ENG table from UMLS (like `select WD, CUI from MRXW_ENG`, but you don't even need database as default UMLSMethatesaurus.zip have all tables in tabular format).
I liked dask dataframe, but kept having problems with the dump/load scispacy model - it's 9GB RAM. I now shard computation using Redis Cluster and Redis Gears, I don't want to run out of memory ever!
Thank you for your contribution, I will keep reading.

Sujit Pal said...

Hi Alex, thanks for the pointer to Redis, I have used basic Redis for toy problems, but Redis Cluster and Gears are new to me. I got past the issue by serving the model over HTTP (described in the post), but I think Redis might be more robust. And totally agree that SciSpacy's entity linking step is about as smart as Aho-Corasick dictionary matching.