Friday, February 17, 2012

Some Dev Pycassa Scripts for Nutch-GORA with Cassandra

Over the last few weeks, I've been tinkering with Nutch-GORA with the Cassandra store. During that time, I've built a couple of simple (but useful) Pycassa scripts that I would like to share here, with the hope that it helps someone doing similar stuff.

But first, a little digression... Building these scripts forced me to look beyond the clean JavaBean-style view of WebPage that the GORA ORM provides to Nutch-GORA. It has also led me to some insights about Column databases and Cassandra, which I would also like to share, because I believe it will help you understand the scripts more easily. If you are already familiar (conceptually, at least) with how Cassandra stores its data, then feel free to skip this bit, you probably have a better mental model of this already than I can describe. But to me, it was an Eureka moment, and a lot of things which I didn't quite understand when I read this post long ago fell into place.

Consider the gora-cassandra-mapping.xml available in nutch/conf. It describes the schema of the WebPage object in terms of two column families and a super-column family, like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?xml version="1.0" encoding="UTF-8"?>
<gora-orm>
    <keyspace name="webpage" cluster="Test Cluster" host="localhost">
        <family name="f"/>
        <family name="p"/>
        <family name="sc" type="super"/>
    </keyspace>
    <class keyClass="java.lang.String" name="org.apache.nutch.storage.WebPage">
        <!-- fetch fields -->
        <field name="baseUrl" family="f" qualifier="bas"/>
        <field name="status" family="f" qualifier="st"/>
        <field name="prevFetchTime" family="f" qualifier="pts"/>
        <field name="fetchTime" family="f" qualifier="ts"/>
        <field name="fetchInterval" family="f" qualifier="fi"/>
        <field name="retriesSinceFetch" family="f" qualifier="rsf"/>
        <field name="reprUrl" family="f" qualifier="rpr"/>
        <field name="content" family="f" qualifier="cnt"/>
        <field name="contentType" family="f" qualifier="typ"/>
        <field name="modifiedTime" family="f" qualifier="mod"/>
        <!-- parse fields -->
        <field name="title" family="p" qualifier="t"/>
        <field name="text" family="p" qualifier="c"/>
        <field name="signature" family="p" qualifier="sig"/>
        <field name="prevSignature" family="p" qualifier="psig"/>
        <!-- score fields -->
        <field name="score" family="f" qualifier="s"/>
        <!-- super columns -->
        <field name="markers" family="sc" qualifier="mk"/>
        <field name="inlinks" family="sc" qualifier="il"/>
        <field name="outlinks" family="sc" qualifier="ol"/>
        <field name="metadata" family="sc" qualifier="mtdt"/>
        <field name="headers" family="sc" qualifier="h"/>
        <field name="parseStatus" family="sc" qualifier="pas"/>
        <field name="protocolStatus" family="sc" qualifier="prs"/>
    </class>
</gora-orm>

JSON is the best way to visualize the structure of a Column database, so a JSON-like view of a single record (generated by one of the scripts I am about to describe later) would look something like this. A JavaBean representation of this JSON-like structure is what you see from within Nutch-GORA code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
webpage: {
   key: "localhost:http:8080/provider/prov1__1__002385.xml" ,
   f: {
     bas : "http://localhost:8080/provider/prov1__1__002385.xml" ,
     cnt : "Original Content as fetched goes here",
     fi : "2592000" ,
     s : "3.01568E-4" ,
     st : "2" ,
     ts : "1338943926088" ,
     typ : "application/xml" ,
   },
   p: {
     c : "Parsed Content (after removing XML tags, etc) goes here",
     psig : "fffd140178fffdbfffd724fffd13fffd79553b" ,
     sig : "fffd140178fffdbfffd724fffd13fffd79553b" ,
     t : "Ions" ,
   },
   sc: {
     h : {
       Content-Length : "3263" ,
       Content-Type : "application/xml" ,
       Date : "Tue, 07 Feb 2012 00:52:06 GMT" ,
       Last-Modified : "Fri, 20 Jan 2012 00:04:19 GMT" ,
       Server : "CherryPy/3.1.2" ,
     }
     il : {
       http://localhost:8080/provider_index/prov1-sitemap.xml : \
         "http://localhost:8080/provider/prov1__1__002385.xml" ,
     }
     mk : {
       __prsmrk__ : "1328561300-1185343131" ,
       _ftcmrk_ : "1328561300-1185343131" ,
       _gnmrk_ : "1328561300-1185343131" ,
       _idxmrk_ : "1328561300-1185343131" ,
       _updmrk_ : "1328561300-1185343131" ,
     }
     mtdt : {
       _csh_ : "0000" ,
       u_category : "SpecialTopic" ,
       u_contentid : "002385" ,
       u_disp : "M" ,
       u_idx : "prov1" ,
       u_lang : "en" ,
       u_reviewdate : "2009-08-09T00:00:00.000Z" ,
     }
     pas : {
       majorCode : "1" ,
       minorCode : "0" ,
     }
     prs : {
       code : "1" ,
       lastModified : "0" ,
     }
   }
}

The scripts, however, would see zero to three maps (columns are optional, remember), all three accessible by the key ${key} and a secondary key. The secondary key is the column family (or super column family) name, "f", "p" or "sc". So WebPage[$key]["f"] would retrieve a map of columns and the values for the particular WebPage's f-column family. The super-column family provides an additional level of nesting, ie, the WebPage[$key]["sc"] returns a map of column families.

So the WebPage that GORA produces is actually built out of three independent entities, all accessible with the same key. There, thats it, end of brilliant insight :-). Hope that wasn't too much of a disappointment.

Display Records

The first script is a script that scans the WebPage keyspace, and for each key, gets the columns from the "f", "p" and "sc" column families and displays them in the JSON like structure shown above. Without any parameters, it produces a ump of the entire WebPage (to standard out). You can also get a list of all the keys by passing in a "-s" switch. Alternatively, you can dump out a single WebPage record by passing in the key as a parameter.

So you can dump out the database, list all the keys, or dump out a single record. Its useful to be able to "see" a record during development, for example, to see that you've got all the fields right. Heres the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python

import getopt
import sys

import pycassa
from pycassa.pool import ConnectionPool
from pycassa.util import OrderedDict

def print_map(level, dict):
  for key in dict.keys():
    value = dict[key]
    if type(value) == type(OrderedDict()):
      print indent(level), key, ": {"
      print_map(level+1, value)
      print indent(level), "}"
    elif key == "sig" or key == "psig" or key == "_csh_":
      # these don't render well even though we do decode
      # unicode to utf8, so converting to hex
      print indent(level), key, ":", quote(to_hex_string(value)), ","
    else:
      print indent(level), key, ":", quote(value), ","
    
def to_hex_string(s):
  chars = []
  for i in range(0, len(s)):
    chars.append(hex(ord(s[i:i+1]))[2:])
  return "".join(chars)

def quote(s):
  if not(s.startswith("\"") and s.endswith("\"")):
    return "".join(["\"", unicode(s).encode("utf8"), "\""])
  else:
    return s

def indent(level):
  return ("." * level * 2)

def usage(message=None):
  print "Usage: %s [-h|-s] [key]" % (sys.argv[0])
  print "-h|--help: show this message"
  print "-s|--summary: show only keys"
  sys.exit(-1)
  
def main():
  try:
    (opts, args) = getopt.getopt(sys.argv[1:], "sh", \
      ["summary", "help"])
  except getopt.GetoptError:
    usage()
  show_summary = False
  for opt in opts:
    (k, v) = opt
    if k in ["-h", "--help"]:
      usage()
    if k in ["-s", "--summary"]:
      show_summary = True
  key = "" if len(args) == 0 else args[0]
  if not show_summary:
    print "webpage: {"
  level = 1
  pool = ConnectionPool("webpage", ["localhost:9160"])
  f = pycassa.ColumnFamily(pool, "f")
  for fk, fv in f.get_range(start=key, finish=key):
    print indent(level), "key:", quote(fk), ","
    if show_summary == True:
      continue
    print indent(level), "f: {"
    if type(fv) == type(OrderedDict()):
      print_map(level+1, fv)
    else:
      print indent(level+1), fk, ":", quote(fv), ","
    print indent(level), "},"
    p = pycassa.ColumnFamily(pool, "p")
    print indent(level), "p: {"
    for pk, pv in p.get_range(start=fk, finish=fk):
      if type(pv) == type(OrderedDict()):
        print_map(level+1, pv)
      else:
        print indent(level+1), pk, ":", quote(pv), ","
    print indent(level), "},"
    sc = pycassa.ColumnFamily(pool, "sc")
    print indent(level), "sc: {"
    for sck, scv in sc.get_range(start=fk, finish=fk):
      if type(scv) == type(OrderedDict()):
        print_map(level+1, scv)
      else:
        print indent(level+1), sck, ":", quote(scv), ","
    print indent(level), "}"
  if not show_summary:
    print "}"

if __name__ == "__main__":
  main()

Reset Status Marks

This script resets the marks that Nutch puts into the WebPage[$key]["sc"]["mk"] column family after each stage (generate, fetch, parse, updatedb). This is useful if you want to redo some operation. Without this, the only way (that I know of anyway) is to drop the keyspace, then redo the steps till that point. This works okay for smaller datasets, but becomes old really quick when dealing with even moderately sized datasets (I am working with a 6000 page collection).

Even though I am fetching off a local (CherryPy) server, it takes a while. I guess I could have just gone with a faster HTTPD server, but this script saved me a lot of time while I was debugging my parsing plugin code, by allowing me to reset and redo the parse and updatedb steps over and over until I got it right. Heres the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#!/usr/bin/python

import sys

import pycassa
from pycassa.pool import ConnectionPool

marks = {
  "generate" : "_gnmrk_",
  "fetch" : "_ftcmrk_",
  "parse" : "__prsmrk__",
  "updatedb" : "_updmrk_",
}

def usage(msg=None):
  print "Usage: %s stage batch-id|-all" % (sys.argv[0])
  print "Stages: %s" % (marks.keys())
  sys.exit(-1)

def main():
  if len(sys.argv) != 3:
    usage()
  mark = None
  try:
    mark = marks[sys.argv[1]]
  except KeyError:
    usage("Unknown stage: %s" % (sys.argv[1]))
  batchid = sys.argv[2]
  pool = ConnectionPool("webpage", ["localhost:9160"])
  sc = pycassa.ColumnFamily(pool, "sc")
  for sck, scv in sc.get_range(start="", finish=""):
    reset = False
    if batchid != "-all":
      # make sure the mark is what we say it is before deleting
      try:
        rbatchid = scv["mk"][mark]
        if batchid == rbatchid:
          reset = True
      except KeyError:
        continue
    else:
      reset = True
    if reset == True:
      print sck
      print "Reset %s for key: %s" % (mark, sck)
      sc.remove(sck, columns=[mark], super_column="mk")

if __name__ == "__main__":
  main()

To call this you supply the stage (one of generate, fetch, parse or updatedb) as the first argument, and either a batch ID or -all for all batch IDs, and the script will "reset" the appropriate WebPage[$key]["sc"]["mk"] by deleting the appropriate column for all the WebPage column families in the keyspace.

And that's it for today. Hope you find the scripts helpful.

6 comments:

  1. Hi Sujit. Since you brought your blog posts on the Nutch/Gora + Cassandra umbrella topic to my attention I've been reading with interest. I can't help but wonder if you consider this stuff use case specific, or whether you think that any aspect of your concepts could be thought of as more generically applicable.
    To put this into context, consider the aspirations that Gora has
    - Data Persistence : There are improvements to be made, but this has mostly been addressed for our key stores, with mor3e in the pipeline.
    Data Access : Base on our logmanager class, this has been addressed and can be extended for use case specific requirements
    Indexing : gora-solr in the pipeline
    Analysis : If we go on the project description for this sub-topic, we have a lot of work to do here... does your work fit in to this category?
    MapReduce support : Currently supported, but some work to do with SqlStore.

    What do you think about the Anlysis step above?

    ReplyDelete
  2. Hi Lewis. Yes, what I describe here is really an application I am working on that will use Nutch/GORA/Cassandra to build a pipeline that can ingest content in multiple formats from various sources, and write them out to Solr so they become searchable. So yes, this /is/ use case specific - it /can/ be considered generically applicable in the sense that others may use some of the ideas in here to build their own application - but what I have here is definitely application level, not framework level.

    Regarding analysis, at some point in the near future, I would like to use the data in Cassandra with Mahout. Further out, I would like to integrate a taxonomy mapping stage using UIMA into the pipeline also (I have parts of this done) so I can support semantic concept-based searching on Solr using this pipeline.

    ReplyDelete
  3. OK I understand.
    regarding your latter comment
    {bq}
    I would like to integrate a taxonomy mapping stage using UIMA into the pipeline also (I have parts of this done) so I can support semantic concept-based searching on Solr using this pipeline.
    {bq}
    I was actually working on something similar about a year or so ago. In my case it was not an automated mapping implementation, instead a manual operation that consisted of using GATE [0] for named entity recognition, the taxonomies were then built manually and could be used within the old Nutch 1.2 ontology plugin [1]. Porting this to Solr would require a custom request handler implementation which would match terms based on taxonomy/ontology classes/subclasses. One large restriction I found with this is that this level of search does not enable you to include predicate relationships in your queries so it is only efficient for querying datasets for class/subclass entities... sometimes we require more detail, however as you mention this is application specific and can/should be altered to suit requirements.

    [0] http://gate.ac.uk/
    [1] http://svn.apache.org/viewvc/nutch/tags/release-1.2/src/plugin/ontology/

    ReplyDelete
  4. Hi Lewis, thanks for the pointer to the nutch ontology plugin, that may be what I need instead of a full mapping stage...

    What we do currently involves writing out concepts and scores (based mainly on TF but also has other factors like relationships, ancestry, etc) in the payload into the index, then mapping the query against the taxonomy at query time, and matching the two. We support both subtype/supertype as well as relationship mappings for search (using custom handlers that look up the taxonomy graph at runtime, so a query for "acne medication" becomes a query for concept_for(acne) AND (concept_for(find_related_concept(acne, has_medication)[0] OR find_related_concept(acne, has_medication)[1] OR ...).

    ReplyDelete
  5. related with Cassandra Database, you can download this article here http://repository.gunadarma.ac.id/bitstream/123456789/2989/1/PERBAN~1.PDF

    ReplyDelete
  6. Thanks Hanum, although you have to read Malay (which I can't) to understand the document.

    ReplyDelete

Comments are moderated to prevent spam.