Saturday, March 17, 2018

Accessing ML models in Spark from various NLP toolkits


NLTK users know that a lot of functionality, even seemingly basic ones like sentence and word tokenization, are dependent on machine learning models pre-trained on default corpora. These models are available as a separate download because of their size. Making these models available to your code is simple -- just a single one time nltk.download() command as described on this page.

The situation is slightly more complicated in case of a distributed environment such as Apache Spark. The general idea is that you partition your data processing across multiple nodes in a cluster and then bring back the processed datasets. We use the web-based Databricks analytics platform on top of Spark, which allows us, among other things, a notebook based development environment that hides some of the boilerplate associated with straight Spark code. Databricks notebooks support Python, but NLTK does not come pre-installed.

In order to use NLTK to process text within Databricks, you need to install NLTK on your cluster. That's not too hard as long as you have the necessary permissions, the process is described on this Databricks documentation page. The PyPI package name I used was "nltk==3.2.5". This will make NLTK available to the master node and all the worker nodes in your cluster.

Generally the very first step in analyzing text data is to tokenize it into sentences and words, and as I mentioned earlier, this needs the appropriate ML model to be available on the workers. Back when I first started working on this, a colleague mentioned that he just added the nltk.download() command to the map call, so it was called for each record on the worker. He accompanied this hint with a brilliant peice of insight -- that the nltk.download() call has code to check if the download has already happened, so subsequent calls after the first one are just pass-throughs.

I thought about this a bit, and realized that I could make the process even more efficient, by calling nltk.download() once per partition using mapPartitions instead of once per record using map. So that's what I did, the code below loads the NLTK models once per partition and tokenizes the text into sentences, then words. Also, once loaded, these models are available to subsequent calls made within the same partition, as shown in the POS tagging done in a subsequent map call.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def download_and_tokenize(iter):
  import nltk

  def tokenize(line):
    image_id, caption_text = line.split("\t")
    tokens = []
    for sent in nltk.sent_tokenize(caption_text):
      for word in nltk.word_tokenize(sent):
        tokens.append(word)
    return (image_id, tokens)

  # for each partition
  nltk.download("all")

  # for each record within partition
  for line in iter:
    yield tokenize(line)


def postag(rec):
  id, tokens = rec
  tokentags = nltk.pos_tag(tokens)
  return (id, tokentags)


captions_rdd = (sc.textFile("/path/to/input/text/file")
  .mapPartitions(download_and_tokenize)
  .map(postag)
)
captions_rdd.take(10)

Of late, the SpaCy NLP library has become more popular, and I think for good reason. It is faster and has more functionality, and is being actively developed based on user feedback. Like NLTK, SpaCy does not come pre-installed on Databricks either, you can install it using the PyPI loader using the package name "spacy==2.0.9". The main problem with my trying to use SpaCy in the same way as NLTK was that I did not know of a Python analog to nltk.download(). SpaCy has a set of 2 commands, a "python -m spacy download en" call on the command line followed by a spacy.load("en") Python call as described on the SpaCy Models and Languages page. While this works very nicely in a single user environment, the only way I could think of to do this in Spark was to login separately into each of the workers and download the model on each, obviously not the most desirable approach in an automated notebook environment.

I had some spare cycles, so I went digging in the code, and found that the "python -m spacy ..." call corresponds to identically named functions in the spacy.cli package. So this allowed me to use SpaCy in Databricks using code as shown below. The idea is the same as for NLTK. We download the model and load it into SpaCy once per partition. Just like NLTK, the spacy.cli.download() call checks for the existence of the model and its dependencies using the pip installer. In addition, my code also checks for existence, so it will bypass the download() call altogether after the first time on each worker. Also unlike NLTK, SpaCy batches up basic operations in a single call for performance as shown in SpaCy lightning tour code example, so we don't have a separate POS tagging step here. But the model should be accessible to subsequent map calls similar to the NLTK case here as well.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def download_tokenize_and_postag(iter):
  import os
  import spacy

  def tokenize_and_postag(line):
    image_id, caption_text = line.split("\t")
    doc = nlp(caption_text)
    token_tags = []
    for token in doc:
      token_tags.append((token.text, token.pos_))
    return (image_id, token_tags)

  # for each partition
  model_dir = spacy.util.get_data_path()
  if not os.path.exists(os.path.join(model_dir.as_posix(), "en")):
    spacy.cli.download("en")
  nlp = spacy.load("en", parser=False)
  
  # for each record within partition
  for line in iter:
    yield tokenize_and_postag(line)


captions_rdd = (sc.textFile("/path/to/input/text/file")
  .mapPartitions(download_tokenize_and_postag)
)
captions_rdd.take(10)

Java based toolkits are easier to work with, at least with respect to model files, since they often embed their model into their JARs and access it as a Resource instead of a File. So the models are automatically distributed to workers along with the code by attaching the JAR file to the cluster. So the user of these toolkits does not have to do anything special to use these libraries. As an example, here is some code to do tokenization and POS tagging using the Spark-NLP library from John Snow Labs. The code is based on Spark-NLP 2.11-1.2.3, and is part of the pipeline described on their quickstart page.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.johnsnowlabs.nlp._
import com.johnsnowlabs.nlp.annotators._
import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel
import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach

import org.apache.spark.sql.functions._
import org.apache.spark.ml.Pipeline

case class Caption(id: String, text: String)

val captionDF = sc.textFile("/path/to/input/text/file")
  .map(line => {
    val Array(id, text) = line.split("\t")
    Caption(id, text)
  })
  .toDF

val assembleDoc = new DocumentAssembler()
  .setInputCol("text")
  .setOutputCol("document")

val sentTokenize = new SentenceDetectorModel()
  .setInputCols(Array("document"))
  .setOutputCol("sentence")

val wordTokenize = new RegexTokenizer()
  .setInputCols(Array("sentence"))
  .setOutputCol("token")
  .setPattern("[^ \\(\\)\\/%]+")     // default pattern is \S+ too loose

val posTagger = new PerceptronApproach()
  .setInputCols(Array("sentence", "token"))
  .setOutputCol("pos")
  .setCorpusPath("/anc-pos-corpus/1400.txt")   // see spark-nlp issue 41

val finishDoc = new Finisher()
  .setInputCols("token")
  .setCleanAnnotations(false)

val pipeline = new Pipeline()
  .setStages(Array(
    assembleDoc,
    sentTokenize,
    wordTokenize,
    posTagger,
    finishDoc
))
val transformedDF = pipeline.fit(captionDF)
  .transform(captionDF)

Notice that the sentence tokenizer and POS tagger uses models, but no mention is made of loading them up-front. The POS tagger has to explicitly set the corpus path. If you look at the file in the repository, you will see it's just a POS-tagged dataset, so presumably the tagger trains itself inline on startup. The SentenceModel on the other hand, seems to use a pre-trained model, which is also loaded once on startup. This would happen once per worker JVM as part of the object's initialization, so this mechanism is even more performant than using mapPartitions().

Lastly, I wanted to mention yet another approach to doing NLP tasks on Spark that we use internally. The approach is similar to the one SpaCy uses -- we have a set of annotators, each of which does a set of tasks. For example, our GeniaAnnotator uses models trained against the GENIA corpus, and outputs sentence, phrase and word boundaries, POS tags and lemmas. An example of annotations output by the annotator is shown below.


These annotations convert unstructured text data into structured annotations, and can be consumed by downstream applications in a language agnostic manner. The annotation building framework has too many hooks into internal systems to be effectively open-sourced, but we do plan on providing exemplar output from our annotators for OA-STM-Corpus, our hand-annotated mini-corpus of scientific open access articles. Our team has also open-sourced AnnotationQuery, a framework that allows you to compose interesting queries on these annotations, either locally or on Spark.

4 comments (moderated to prevent spam):

Anonymous said...

Hi,
Do you have any idea to call spacy on each row in the dataframe?

def run_spacy(txt):
nlp = spacy.load('en')
phrases = ""
chunkDoc = nlp(txt)
for chunk in chunkDoc.noun_chunks:
phrases = phrases+' '+ chunk.text.lower().strip().replace("\n","").replace(' ','_')

return phrases


udfSpacy = udf(run_spacy, ArrayType(StringType()))
dataDF1 = dataDF.withColumn("length", udfSpacy(dataDF.TEXT))

Rawen Noer said...

How do you embed these codes in your posts? I have written article on spaCy library but was not able to add highlighted codes.

Sujit Pal said...

Hi Rawen, I use pygments to post-process the code snippet with the lexer appropriate for the language and then paste the HTML output into the contents.

Sujit Pal said...

Hi Anonymous, the code you provided above looks like it should work provided the "en" model is available on the workers. One way to do that is to use the mapPartitions pattern I have described in the post (there is one for spacy too, where the loading is done once per partition). Another way is to set up configuration of the cluster during startup, where you specify what gets installed on the workers -- see this Databricks documentation page.