Saturday, January 04, 2014

Akka Content Ingestion Pipeline, Part V


The title for this post is a bit misleading, since the post is not really about Akka or Content Ingestion. This post describes a Cassandra CQL DAO for delsym, ie, a component in my content ingestion pipeline that I have been discussing under this title. The Cassandra CQL DAO is meant to be used instead of the current MongoDB layer. I chose MongoDB originally because a document-oriented database is a natural fit for a document pipeline, and I built the Cassandra CQL interface later mainly because we use Cassandra at work, and I wanted to see if a column-oriented database presented any special challenges. Plus, I wanted to learn Cassandra CQL, and this seemed a good way to learn it.

Cassandra CQL is a DSL that looks almost like SQL, so it is easy for people with an RDBMS background to learn and use. Under the covers, Cassandra stores the data in column format using various translation rules (described by John Berryman in this Youtube presentation if you are interested). This approach gives you the best of both worlds - the performance of a column database and the simplicity and convenience of a familiar user interface. SAP HANA is another database which uses a this strategy (ie, SQL interface, column storage).

Set up involves downloading and untarring to a given location, configuring and creating directories for logs, data, saved_caches and commitlog, and starting the cassandra daemon (with "cd $CASSANDRA_HOME; bin/cassandra -f"). On another terminal, we need to create the delsymdb keyspace (database in RDBMS) and the documents table. The primary key on URL is set up in the CREATE TABLE. The block below shows this being done in the CQL shell (invoked with "cd $CASSANDRA_HOME; bin/cqlsh")

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
cqlsh> create keyspace delsymdb
   ...   with replication = {'class':'SimpleStrategy', 
   ...   'replication_factor':1};
cqlsh> use delsymdb;
cqlsh:delsymdb> create table documents (
            ...   url text,
            ...   depth int,
            ...   content text,
            ...   fetchmeta map<text,text>,
            ...   fts timestamp,
            ...   text_content text,
            ...   parsemeta map<text,text>,
            ...   pts timestamp,
            ...   its timestamp,
            ...   primary key(url));

I use the Datastax Java driver to access this database from my Scala code. I followed the examples in the Instant Cassandra Query Language book, both to learn CQL (on the CQL shell) and to write the code. I also looked at the code in cassandra-samples. My DAO code is shown below, it extends the same BaseDbDao class as MongoDbDao does, so the methods are identical.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Source: src/main/scala/com/mycompany/delsym/daos/CassandraDbDao.scala
package com.mycompany.delsym.daos

import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer

import com.datastax.driver.core.Cluster
import com.typesafe.config.ConfigFactory

class CassandraDbDao extends BaseDbDao {

  val conf = ConfigFactory.load()
  val host = conf.getString("delsym.cassandradb.host")
  val dbname = conf.getString("delsym.cassandradb.dbname")
  val tablename = conf.getString("delsym.cassandradb.collname")
  
  val cluster = Cluster.builder()
                       .addContactPoint(host)
                       .build()
  val session = cluster.connect()
  session.execute("USE %s".format(dbname))
  
  override def insertFetched(url: String, depth: Int, 
      fetchMetadata: Map[String,Any], 
      content: String): Either[FailResult,Unit] = {
    try {
      deleteByUrl(url) match {
        case Left(f) => Left(f)
        case _ => {}
      }
      val fmeta = fetchMetadata.map(kv => 
          "'%s':'%s'".format(kv._1, kv._2.toString))
        .mkString(", ")
      val insfetch = "insert into documents(" + 
        "url, depth, content, fetchmeta, fts) values (" +
        "'%s', %d, '%s', {%s}, '%s');"
        .format(url, depth, content, fmeta, iso8609(new Date()))
      session.execute(insfetch)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting fetch data", e))
    }
  }
  
  override def insertParsed(url: String, text: String, 
      parseMetadata: Map[String,Any]): 
      Either[FailResult,Unit] = {
    try {
      val pmeta = parseMetadata.map(kv => 
        "'%s':'%s'".format(kv._1, kv._2.toString))
        .mkString(", ")
      val insparse = "insert into documents(" +
        "url, text_content, parsemeta, pts) values (" +
        "'%s', '%s', {%s}, '%s');"
        .format(url, text, pmeta, iso8609(new Date()))
      session.execute(insparse)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting parse data", e))
    }
  }
  
  override def insertIndexed(url: String): 
      Either[FailResult,Unit] = {
    try {
      val insindex = "insert into documents(url, its) " +
        "values('%s', '%s')"
        .format(url, iso8609(new Date()))
      session.execute(insindex)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error inserting index data", e))
    }
  }
  
  override def getByUrl(url: String, fields: List[String]):
      Either[FailResult,Map[String,Any]] = {
    try {
      val fldlist = if (fields.isEmpty) '*' 
                    else fields.mkString(",") 
      val query = "select %s from %s where url = '%s'"
        .format(fldlist, tablename, url) 
      val results = session.execute(query)
      val row = results.one()
      val colnames = if (fields.isEmpty)
        row.getColumnDefinitions()
          .asList()
          .map(coldef => coldef.getName())
          .toList
      else fields
      var colvals = ArrayBuffer[(String,Any)]()
      colnames.map(colname => colname match {
        case "url" => 
          colvals += (("url", row.getString("url")))
        case "content" => 
          colvals += (("content", row.getString("content")))
        case "depth" => 
          colvals += (("depth", row.getInt("depth")))
        case "fetchmeta" => {
          val fmeta = row.getMap("fetchmeta", 
            classOf[String], classOf[String])
          fmeta.map(kv => 
            colvals += (("f_" + kv._1, kv._2)))
        }
        case "fts" => 
          colvals += (("fts", row.getDate("fts")))
        case "text_content" => 
          colvals += (("textContent", row.getString("text_content")))
        case "parsemeta" => {
          val pmeta = row.getMap("parsemeta", 
            classOf[String], classOf[String])
          pmeta.map(kv => 
            colvals += (("p_" + kv._1, kv._2)))
        }
        case "pts" => 
          colvals += (("pts", row.getDate("pts")))
        case "its" => 
          colvals += (("its", row.getDate("its")))
        case _ => {}
      })
      Right(colvals.toMap)
    } catch {
      case e: NullPointerException => Right(Map.empty) 
      case e: Exception =>  
        Left(FailResult(e.getMessage(), e))
    }
  }
  
  override def close(): Unit = cluster.shutdown()

  def deleteByUrl(url: String): Either[FailResult,Unit] = {
    try {
      val delquery = 
        "delete from documents where url = '%s';"
        .format(url)
      session.execute(delquery)
      Right()
    } catch {
      case e: Exception => 
        Left(FailResult("Error deleting by URL: [" + 
          url + "]", e))
    }
  }
  
  def iso8609(d: Date): String = {
    lazy val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    sdf.format(d)
  }
}

As you can see, the code for the getByUrl() method is considerably more verbose than its MongoDB counterpart. This is because the driver does not provide a generalized method to get an object, you must specify the data type (for example, getString, getInt, etc). A better approach may have been to use ColumnDefinitions (similar to the ResultSetMetaData in regular JDBC) and maybe I will in a later iteration once I understand it better. For now, anytime we want to alter our table to add/delete columns, we will need to update this method.

Also, an additional method deleteByUrl() has to be defined. This is used by the insertFetched() method to first delete all columns for the page before new information is added in. This was not necessary in MongoDb, since the entire document would be replaced during insert. Another issue is that the metadata for fetch and parse can no longer be persisted in their native form - since the data type is map<text,text> the value must be String.

CQL also supports PreparedStatements (based on examples in cassandra-samples) but I could not get it to work - no errors were thrown, the inserts just did not happen. I ended up composing the SQL in code and then just passing it to session.execute() - the added benefit of this approach is that you can try it on the CQL shell if it doesn't work - the error messages are usually more informative. Another thing to note is that single and double quotes are not interchangeable as they are in most SQL dialects.

The test below expects a running Cassandra instance with the database and table created as described above. It runs the DAO methods and verifies the results by looking up data in the table at the end of each call.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Source: src/test/scala/com/mycompany/delsym/daos/CassandraDbDaoTest.scala
package com.mycompany.delsym.daos

import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll

class CassandraDbDaoTest extends FunSuite
                         with BeforeAndAfterAll {

  val cassDao = new CassandraDbDao()
  
  override def afterAll() = cassDao.close()
  
  test("get non-existent record") {
    val url = "http://www.google.com"
    cassDao.deleteByUrl(url)
    val result = 
      cassDao.getByUrl(url, List.empty)
    result match {
      case Left(f) => fail("unexpected exception")
      case Right(r) => {
        Console.println("result=[" + r + "]")
        assert(r != null)
        assert(r.isEmpty)
      }
    }
  }
  
  test("insert after fetch") {
    val url = "http://www.google.com"
    val content = "<HTML>Some arbitary content</HTML>"
    val fetchMetadata = List(
       ("feed_title", "Some arbitary title"),
       ("feed_summary", "Jack the giant killer")).toMap
    cassDao.insertFetched(url, 0, fetchMetadata, content)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after fetch, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 9)
      }
      case Left(f) => fail("unexpected exception")
    }
  }
  
  test("insert after parse") {
    val url = "http://www.google.com"
    val textContent = "Some arbitary content"
    val parseMetadata = List(
       ("title", "The real title"),
       ("author", "Jack G Killer")).toMap
    cassDao.insertParsed(url, textContent, parseMetadata)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after parse, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 11)
      }
      case Left(f) => fail("unexpected exception")
    }
  }
  
  test("insert after index") {
    val url = "http://www.google.com"
    cassDao.insertIndexed(url)
    cassDao.getByUrl(url, List.empty) match {
      case Right(row) => {
        Console.println("row (after index, #-cols:" + 
          row.size + ") = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 11)
      }
      case Left(f) => fail("unexpected exception")
    }
  }

  test("get inserted record, selected fields") {
    val url = "http://www.google.com"
    val result = cassDao.getByUrl(url, List("pts", "parsemeta"))
    result match {
      case Right(row) => {
        Console.println("partial row get = " + row)
        assert(row != null)
        assert(! row.isEmpty)
        assert(row.size == 3)
      }
      case Left(f) => {
        f.e.printStackTrace()
        fail("unexpected exception", f.e)
      }
    }
  }
}

And thats all I have for today. Hopefully next post I will talk about something else :-).

Be the first to comment. Comments are moderated to prevent spam.