Friday, January 20, 2012

Nutch/GORA - Parsing Custom XML

This week, I continue my exploration of Nutch/GORA to see if I can use it to parse and index custom (non-web) XML content. In addition to crawling selected medical sites for our search index, we also rely on content providers who provide us encyclopedic medical content in different areas. These providers periodically provide us data, each in their own custom XML format, which we parse and ingest into our indexes. In this post, I describe how I plan to do this using Nutch/GORA.

Nutch provides the XMLParser_Plugin as an add-on. It can be configured to parse a particular format using XPath expressions. I looked at it briefly, but I abandoned it because it wasn't clear how it would be able to parse multiple XML formats (one of my requirements).

Of course, one could settle on a common XML format as proposed in this proposal and then use this plugin. That is certainly an option, and it allows the parsing work to be farmed out - XML is almost like a second language for Java application programmers, unlike the backend types who do the crawling and indexing stuff. But the problem with this approach is the extra step, and the fact that the fetched content is no longer the original content we got.

The approach I came up with was to write a Nutch parse plugin that uses the u_idx metadata field (described in my previous post) to decide which parser to spin up for a given XML page. The XML parsers all take a String representing the fetched content and return a map of the parsed fields. This still allows the division of labor thing I spoke of earlier - the parsers are simple XML parsers, and the plugin deals with the details of updating the WebPage data back into Cassandra.

Exposing Content over HTTP

The first step is to expose the provider XML content over an HTTP interface. Right now I am developing/testing in local mode, but at some point I plan on running this in distributed mode, and having the content available over HTTP just makes more sense. So I added another handler to my CherryPy server to serve XML files with content type "application/xml". The updated code is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
import os.path
import cherrypy
from cherrypy.lib.static import serve_file

SITES_DIR="/path/to/my/data/directory"

class Root:

  @cherrypy.expose
  def test(self, name):
    return serve_file(os.path.join(SITES_DIR, "test", "%s.html" % (name)), \
      content_type="text/html")

  @cherrypy.expose
  def providers(self, name):
    return serve_file(os.path.join(SITES_DIR, "providers", \
      "%s" % (name.replace("__", "/"))), content_type="application/xml")

if __name__ == '__main__':
  current_dir = os.path.dirname(os.path.abspath(__file__))
  # Set up site-wide config first so we get a log if errors occur.
  cherrypy.config.update({'environment': 'production',
    'log.access_file': 'site.log',
    'log.screen': True,
    "server.socket_host" : "127.0.0.1",
    "server.socket_port" : 8080})
  cherrypy.quickstart(Root(), '/')

And my seed URL (generated by running a find command followed by a few text transformations in vim) looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
http://localhost:8080/providers/prov1__1__000001.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000002.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000003.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000004.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000005.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000006.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000007.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000008.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000010.xml  u_idx=prov1
...

The XML files themselves are provided in a multi-directory format - path separators in the file name (relative to the root of the local site) become "__" in the URLs, and the handler makes the transformation to read and serve the appropriate file.

Parse Plugin Code

To enable the custom parsing, I added a parse plugin to my "mycompany" plugin project that uses the u_idx value to parse the content with the appropriate XML processor. I decided to use JDOM as my parsing library since this is already provided in the nutch distribution. Here is my updated plugin.xml file, the definition for the parse plugin is in the last block.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/plugin/mycompany/plugins.xml -->
<plugin id="mycompany" 
    name="Custom Company-specific plugins"
    version="1.0.0"
    provider-name="My Company">

  <runtime>
    <library name="mycompany.jar">
      <export name="*"/>
    </library>
    <library name="jdom-1.1.jar"/>
  </runtime>

  <requires>
    <import plugin="nutch-extensionpoints"/>
  </requires>

  <extension id="com.mycompany.nutch.indexer.usertags"
      name="User Tag Indexing Filter"
      point="org.apache.nutch.indexer.IndexingFilter">
    <implementation id="mycompany-indexer-usertags"
        class="com.mycompany.nutch.indexer.usertags.UserTagIndexingFilter"/>
  </extension>

  <extension id="com.mycompany.nutch.scoring.usertags"
      name="User Tag Scoring Filter"
      point="org.apache.nutch.scoring.ScoringFilter">
    <implementation id="mycompany-scoring-metadata"
        class="com.mycompany.nutch.scoring.usertags.UserTagScoringFilter"/>
  </extension>

  <extension id="com.mycompany.nutch.parse.xml"
      name="Provider XML Parser Plugin"
      point="org.apache.nutch.parse.Parser">
    <implementation id="mycompany-parse-provider-xml"
        class="com.mycompany.nutch.parse.xml.ProviderXmlParser">
      <parameter name="contentType" value="application/xml"/>
      <parameter name="pathSuffix" value="xml"/>
    </implementation>
  </extension>

</plugin>

The Java code for the parse plugin is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlParser.java
package com.mycompany.nutch.parse.xml;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.parse.Outlink;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.parse.ParseStatusCodes;
import org.apache.nutch.parse.Parser;
import org.apache.nutch.storage.ParseStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;

public class ProviderXmlParser implements Parser {

  private static final Log LOG = LogFactory.getLog(ProviderXmlParser.class); 
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  private static final Utf8 IDX_KEY = new Utf8("u_idx");
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
    FIELDS.add(WebPage.Field.OUTLINKS);
  }

  private Configuration conf;
  
  @Override
  public Parse getParse(String url, WebPage page) {
    Parse parse = new Parse();
    parse.setParseStatus(new ParseStatus());
    parse.setOutlinks(new Outlink[0]);
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    if (metadata.containsKey(IDX_KEY)) {
      String idx = Bytes.toString(Bytes.toBytes(metadata.get(IDX_KEY)));
      IProviderXmlProcessor processor = 
        ProviderXmlProcessorFactory.getProcessor(idx);
      if (processor != null) {
        try {
          LOG.info("Parsing URL:[" + url + "] with " + 
            processor.getClass().getSimpleName());
          Map<String,String> parsedFields = processor.parse(
              Bytes.toString(Bytes.toBytes(page.getContent())));
          parse.setText(parsedFields.get(ProviderXmlFields.content.name()));
          parse.setTitle(parsedFields.get(ProviderXmlFields.title.name()));
          // set the rest of the metadata back into the page
          for (String key : parsedFields.keySet()) {
            if (ProviderXmlFields.content.name().equals(key) ||
                ProviderXmlFields.title.name().equals(key)) {
              continue;
            }
            page.putToMetadata(new Utf8(key), 
              ByteBuffer.wrap(parsedFields.get(key).getBytes()));
          }
          parse.getParseStatus().setMajorCode(ParseStatusCodes.SUCCESS);
        } catch (Exception e) {
          LOG.warn("Parse of URL: " + url + " failed", e);
          LOG.warn("content=[" + Bytes.toString(Bytes.toBytes(
            page.getContent())) + "]");
          parse.getParseStatus().setMajorCode(ParseStatusCodes.FAILED);
        }
      }
    }
    return parse;
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }
}

As you can see, the getParse() method reads the u_idx metadata from the page, and based on that asks the ProviderXmlProcessorFactory for the appropriate processor. The factory returns the appropriate IProviderXmlProcessor implementation for the content type (or a null if no implementation can be found). The IProviderXmlProcessor takes the content as a string, parses it with custom logic and returns a Map of metadata names and values. The plugin than populates the Parse object and directly updates the WebPage with the metadata. Here is the rest of the code, without too much explanation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlProcessorFactory.java
package com.mycompany.nutch.parse.xml;

import java.util.HashMap;
import java.util.Map;

public class ProviderXmlProcessorFactory {

  private static ProviderXmlProcessorFactory FACTORY = 
    new ProviderXmlProcessorFactory();
  private Map<String,IProviderXmlProcessor> processors = 
    new HashMap<String,IProviderXmlProcessor>();
  
  private ProviderXmlProcessorFactory() {
    processors.put("prov1", new Prov1XmlProcessor());
    // no more for now
  }
  
  public static IProviderXmlProcessor getProcessor(String idx) {
    return FACTORY.processors.get(idx);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/IProviderXmlProcessor.java
package com.mycompany.nutch.parse.xml;

import java.util.Map;

public interface IProviderXmlProcessor {

  public Map<String,String> parse(String content) throws Exception;

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/Prov1XmlProcessor.java
package com.mycompany.nutch.parse.xml;

import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;

import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
import org.xml.sax.InputSource;

public class Prov1XmlProcessor implements IProviderXmlProcessor {

  @Override
  public Map<String,String> parse(String content) throws Exception {
    Map<String,String> parsedFields = new HashMap<String,String>();
    SAXBuilder builder = new SAXBuilder();
    Document doc = builder.build(new InputSource(
      new ByteArrayInputStream(content.getBytes())));
    Element root = doc.getRootElement();
    parsedFields.put(ProviderXmlFields.u_lang.name(), 
      root.getAttributeValue("language"));
    parsedFields.put(ProviderXmlFields.title.name(), 
      root.getAttributeValue("title"));
    parsedFields.put(ProviderXmlFields.u_category.name(), 
      root.getAttributeValue("subContent"));
    parsedFields.put(ProviderXmlFields.u_contentid.name(), 
      root.getAttributeValue("genContentID"));
    Element versionInfo = root.getChild("versionInfo");
    if (versionInfo != null) {
      parsedFields.put(ProviderXmlFields.u_reviewdate.name(), 
        ProviderXmlParserUtils.convertToIso8601(
        versionInfo.getAttributeValue("reviewDate")));
      parsedFields.put(ProviderXmlFields.u_reviewers.name(), 
        versionInfo.getAttributeValue("reviewedBy"));
    }
    parsedFields.put(ProviderXmlFields.content.name(), 
      ProviderXmlParserUtils.getTextContent(root));
    return parsedFields;
  }
}

In addition, I built some standard functions and put them in their own utility class, and an enum which lists the fields available.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Source: /src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlParserUtils.java
package com.mycompany.nutch.parse.xml;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jdom.Element;

public class ProviderXmlParserUtils {

  private static final Log LOG = LogFactory.getLog(ProviderXmlParserUtils.class);
  
  /////////// normalize incoming dates to ISO-8601 ///////////
  
  private static final String[] DATE_PATTERNS = new String[] {
    "yyyyMMdd",
    "MM/dd/yyyy",
  };
  private static final SimpleDateFormat ISO_8601_DATE_FORMAT =
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  public static String convertToIso8601(String date) {
    try {
      Date d = DateUtils.parseDate(date, DATE_PATTERNS);
      return ISO_8601_DATE_FORMAT.format(d);
    } catch (Exception e) {
      LOG.warn("Cannot convert date: " + date + " to ISO-8601 format", e);
      return "";
    }
  }

  ///////////// get text content of XML //////////////////
  
  public static String getTextContent(Element root) {
    StringBuilder buf = new StringBuilder();
    getTextContent_r(buf, root);
    return buf.toString();
  }
  
  @SuppressWarnings("unchecked")
  private static void getTextContent_r(StringBuilder buf, Element e) {
    buf.append(e.getTextTrim());
    List<Element> children = e.getChildren();
    for (Element child : children) {
      getTextContent_r(buf, child);
    }
    buf.append(" ");
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlFields.java
package com.mycompany.nutch.parse.xml;

public enum ProviderXmlFields {

  // common
  title,
  content,
  // prov1 
  u_lang,
  u_category,
  u_contentid,
  u_reviewdate,
  u_reviewers,
}

Note that while I have put all the classes in the same package for convenience, there is nothing preventing someone (and is probably recommended in an environment where you want to move the XML parsing work to a different group) from putting everything other than the actual plugin into a separate project and declaring the JAR from that project as a dependency.

To test it, I repeatedly dropped the webpage from Cassandra, ran inject, generate, fetch and parse and looked at the Cassandra records usng my display_webpage.py script and at error messages in logs/hadoop.log.

The first time I ran this code (well, not the first time, but you get the idea), I got the following exception in the logs/hadoop.log.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
2012-01-18 15:28:00,860 INFO  parse.ParserFactory - The parsing plugins: 
[org.apache.nutch.parse.tika.TikaParser] are enabled via the plugin.includes 
system property, and all claim to support the content type application/xml, 
but they are not mapped to it  in the parse-plugins.xml file
2012-01-18 15:28:01,219 ERROR tika.TikaParser - Error parsing http://localhost:
8080/providers/prov1__1__000008.xmlorg.apache.tika.exception.TikaException: XML
parse error at org.apache.tika.parser.xml.XMLParser.parse(XMLParser.java:71)
        at org.apache.nutch.parse.tika.TikaParser.getParse(TikaParser.java:117)
        at org.apache.nutch.parse.ParseCallable.call(ParseCallable.java:36)
        at org.apache.nutch.parse.ParseCallable.call(ParseCallable.java:23)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.lang.Thread.run(Thread.java:680)

This told me that the (preconfigured) Tika XML parser was getting to my data first, which I did not want, so I disabled it in the nutch-site.xml plugin.includes property. My plugin.includes now looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<!-- From this:
<property>
  <name>plugin.includes</name>
  <value>protocol-http|urlfilter-regex|parse-(html|tika)|\
         index-(basic|anchor)|urlnormalizer-(pass|regex|\
         basic)|scoring-opic|mycompany</value>
  <description/>
</property>

to this: -->

<property>
  <name>plugin.includes</name>
  <value>protocol-http|urlfilter-regex|parse-html|\
         index-(basic|anchor)|urlnormalizer-(pass|regex|\
         basic)|scoring-opic|mycompany</value>
  <description/>
</property>

Indexing

After this, a couple of iterations later, I got a clean parse run. I followed that up with updatedb, then modified my old UserTagIndexingFilter to pull all metadata columns whose key is prefixed with u_ (meaning user-defined). Here is the updated code for the UserTagIndexingFilter.java.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/usertags/UserTagIndexingFilter.java
package com.mycompany.nutch.indexer.usertags;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;

public class UserTagIndexingFilter implements IndexingFilter {

  private static final Log LOG = LogFactory.getLog(UserTagIndexingFilter.class);
  private static final String INHERITABLE_USERTAGS_LIST_PROP = 
    "mycompany.usertags.inheritable";
  
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
  }
  
  private Configuration conf;
  private Set<String> inheritableUserTags;
  
  @Override
  public NutchDocument filter(NutchDocument doc, String url, 
      WebPage page) throws IndexingException {
    LOG.info("Adding user tags for page:" + url);
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    for (Utf8 key : metadata.keySet()) {
      String keyStr = TableUtil.toString(key);
      if (StringUtils.isEmpty(keyStr)) {
        continue;
      }
      if (keyStr.startsWith("u_")) {
        String value = Bytes.toString(
          Bytes.toBytes(metadata.get(key)));
        doc.add(keyStr, value);
      }
    }
    return doc;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }
}

I also needed to add the new fields I was pulling out to the conf/solrindex-mapping.xml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<!-- conf/solrindex-mapping.xml -->
<mapping>
  ...
  <fields>
    <field dest="content" source="content"/>
    <!-- more nutch defined fields here -->
    ...
    <!-- user defined -->
    <field dest="u_idx" source="u_idx"/>
    <field dest="u_contentid" source="u_contentid"/>
    <field dest="u_category" source="u_category"/>
    <field dest="u_lang" source="u_lang"/>
    <field dest="u_reviewdate" source="u_reviewdate"/>
    <field dest="u_reviewers" source="u_reviewers"/>
  </fields>
  <uniqueKey>id</uniqueKey>
</mapping>

and to the Solr schema.xml file (followed by a Solr restart).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<?xml version="1.0" encoding="UTF-8" ?>
<!-- Source: conf/schema.xml (copied to solr's example/solr/conf) -->
<schema name="nutch" version="1.4">
  ...
        <field name="u_idx" type="string" stored="true" indexed="true"/>
        <field name="u_contentid" type="string" stored="true" indexed="true"/>
        <field name="u_category" type="string" stored="true" indexed="true"/>
        <field name="u_lang" type="string" stored="true" indexed="true"/>
        <field name="u_reviewdate" type="string" stored="true" indexed="false"/>
        <field name="u_reviewers" type="string" stored="true" indexed="false"/>
  ...
</schema>

Running the nutch solrindex job now promotes all the XML files (along with the parsed metadata) into the Solr index.

Conclusion

The approach described works well when an XML document corresponds to one WebPage record in Cassandra (and one Solr record). But in some cases, we parse the file up into sections and do other stuff with it. In such cases, it may be better to go with the intermediate XML markup approach described in the proposal above, or even a hybrid approach where the original file goes through the pipeline as described, then goes through an additional external parse phase which produces section files (for example) in the intermediate XML format. I haven't thought through this thing in great detail though, I plan on doing this after I am done checking out the basic stuff.

No comments:

Post a Comment

Comments are moderated to prevent spam.