The title for this post is a bit misleading, since the post is not really about Akka or Content Ingestion. This post describes a Cassandra CQL DAO for delsym, ie, a component in my content ingestion pipeline that I have been discussing under this title. The Cassandra CQL DAO is meant to be used instead of the current MongoDB layer. I chose MongoDB originally because a document-oriented database is a natural fit for a document pipeline, and I built the Cassandra CQL interface later mainly because we use Cassandra at work, and I wanted to see if a column-oriented database presented any special challenges. Plus, I wanted to learn Cassandra CQL, and this seemed a good way to learn it.
Cassandra CQL is a DSL that looks almost like SQL, so it is easy for people with an RDBMS background to learn and use. Under the covers, Cassandra stores the data in column format using various translation rules (described by John Berryman in this Youtube presentation if you are interested). This approach gives you the best of both worlds - the performance of a column database and the simplicity and convenience of a familiar user interface. SAP HANA is another database which uses a this strategy (ie, SQL interface, column storage).
Set up involves downloading and untarring to a given location, configuring and creating directories for logs, data, saved_caches and commitlog, and starting the cassandra daemon (with "cd $CASSANDRA_HOME; bin/cassandra -f"). On another terminal, we need to create the delsymdb keyspace (database in RDBMS) and the documents table. The primary key on URL is set up in the CREATE TABLE. The block below shows this being done in the CQL shell (invoked with "cd $CASSANDRA_HOME; bin/cqlsh")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | cqlsh> create keyspace delsymdb
... with replication = {'class':'SimpleStrategy',
... 'replication_factor':1};
cqlsh> use delsymdb;
cqlsh:delsymdb> create table documents (
... url text,
... depth int,
... content text,
... fetchmeta map<text,text>,
... fts timestamp,
... text_content text,
... parsemeta map<text,text>,
... pts timestamp,
... its timestamp,
... primary key(url));
|
I use the Datastax Java driver to access this database from my Scala code. I followed the examples in the Instant Cassandra Query Language book, both to learn CQL (on the CQL shell) and to write the code. I also looked at the code in cassandra-samples. My DAO code is shown below, it extends the same BaseDbDao class as MongoDbDao does, so the methods are identical.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | // Source: src/main/scala/com/mycompany/delsym/daos/CassandraDbDao.scala
package com.mycompany.delsym.daos
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import com.datastax.driver.core.Cluster
import com.typesafe.config.ConfigFactory
class CassandraDbDao extends BaseDbDao {
val conf = ConfigFactory.load()
val host = conf.getString("delsym.cassandradb.host")
val dbname = conf.getString("delsym.cassandradb.dbname")
val tablename = conf.getString("delsym.cassandradb.collname")
val cluster = Cluster.builder()
.addContactPoint(host)
.build()
val session = cluster.connect()
session.execute("USE %s".format(dbname))
override def insertFetched(url: String, depth: Int,
fetchMetadata: Map[String,Any],
content: String): Either[FailResult,Unit] = {
try {
deleteByUrl(url) match {
case Left(f) => Left(f)
case _ => {}
}
val fmeta = fetchMetadata.map(kv =>
"'%s':'%s'".format(kv._1, kv._2.toString))
.mkString(", ")
val insfetch = "insert into documents(" +
"url, depth, content, fetchmeta, fts) values (" +
"'%s', %d, '%s', {%s}, '%s');"
.format(url, depth, content, fmeta, iso8609(new Date()))
session.execute(insfetch)
Right()
} catch {
case e: Exception =>
Left(FailResult("Error inserting fetch data", e))
}
}
override def insertParsed(url: String, text: String,
parseMetadata: Map[String,Any]):
Either[FailResult,Unit] = {
try {
val pmeta = parseMetadata.map(kv =>
"'%s':'%s'".format(kv._1, kv._2.toString))
.mkString(", ")
val insparse = "insert into documents(" +
"url, text_content, parsemeta, pts) values (" +
"'%s', '%s', {%s}, '%s');"
.format(url, text, pmeta, iso8609(new Date()))
session.execute(insparse)
Right()
} catch {
case e: Exception =>
Left(FailResult("Error inserting parse data", e))
}
}
override def insertIndexed(url: String):
Either[FailResult,Unit] = {
try {
val insindex = "insert into documents(url, its) " +
"values('%s', '%s')"
.format(url, iso8609(new Date()))
session.execute(insindex)
Right()
} catch {
case e: Exception =>
Left(FailResult("Error inserting index data", e))
}
}
override def getByUrl(url: String, fields: List[String]):
Either[FailResult,Map[String,Any]] = {
try {
val fldlist = if (fields.isEmpty) '*'
else fields.mkString(",")
val query = "select %s from %s where url = '%s'"
.format(fldlist, tablename, url)
val results = session.execute(query)
val row = results.one()
val colnames = if (fields.isEmpty)
row.getColumnDefinitions()
.asList()
.map(coldef => coldef.getName())
.toList
else fields
var colvals = ArrayBuffer[(String,Any)]()
colnames.map(colname => colname match {
case "url" =>
colvals += (("url", row.getString("url")))
case "content" =>
colvals += (("content", row.getString("content")))
case "depth" =>
colvals += (("depth", row.getInt("depth")))
case "fetchmeta" => {
val fmeta = row.getMap("fetchmeta",
classOf[String], classOf[String])
fmeta.map(kv =>
colvals += (("f_" + kv._1, kv._2)))
}
case "fts" =>
colvals += (("fts", row.getDate("fts")))
case "text_content" =>
colvals += (("textContent", row.getString("text_content")))
case "parsemeta" => {
val pmeta = row.getMap("parsemeta",
classOf[String], classOf[String])
pmeta.map(kv =>
colvals += (("p_" + kv._1, kv._2)))
}
case "pts" =>
colvals += (("pts", row.getDate("pts")))
case "its" =>
colvals += (("its", row.getDate("its")))
case _ => {}
})
Right(colvals.toMap)
} catch {
case e: NullPointerException => Right(Map.empty)
case e: Exception =>
Left(FailResult(e.getMessage(), e))
}
}
override def close(): Unit = cluster.shutdown()
def deleteByUrl(url: String): Either[FailResult,Unit] = {
try {
val delquery =
"delete from documents where url = '%s';"
.format(url)
session.execute(delquery)
Right()
} catch {
case e: Exception =>
Left(FailResult("Error deleting by URL: [" +
url + "]", e))
}
}
def iso8609(d: Date): String = {
lazy val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
sdf.format(d)
}
}
|
As you can see, the code for the getByUrl() method is considerably more verbose than its MongoDB counterpart. This is because the driver does not provide a generalized method to get an object, you must specify the data type (for example, getString, getInt, etc). A better approach may have been to use ColumnDefinitions (similar to the ResultSetMetaData in regular JDBC) and maybe I will in a later iteration once I understand it better. For now, anytime we want to alter our table to add/delete columns, we will need to update this method.
Also, an additional method deleteByUrl() has to be defined. This is used by the insertFetched() method to first delete all columns for the page before new information is added in. This was not necessary in MongoDb, since the entire document would be replaced during insert. Another issue is that the metadata for fetch and parse can no longer be persisted in their native form - since the data type is map<text,text> the value must be String.
CQL also supports PreparedStatements (based on examples in cassandra-samples) but I could not get it to work - no errors were thrown, the inserts just did not happen. I ended up composing the SQL in code and then just passing it to session.execute() - the added benefit of this approach is that you can try it on the CQL shell if it doesn't work - the error messages are usually more informative. Another thing to note is that single and double quotes are not interchangeable as they are in most SQL dialects.
The test below expects a running Cassandra instance with the database and table created as described above. It runs the DAO methods and verifies the results by looking up data in the table at the end of each call.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | // Source: src/test/scala/com/mycompany/delsym/daos/CassandraDbDaoTest.scala
package com.mycompany.delsym.daos
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
class CassandraDbDaoTest extends FunSuite
with BeforeAndAfterAll {
val cassDao = new CassandraDbDao()
override def afterAll() = cassDao.close()
test("get non-existent record") {
val url = "http://www.google.com"
cassDao.deleteByUrl(url)
val result =
cassDao.getByUrl(url, List.empty)
result match {
case Left(f) => fail("unexpected exception")
case Right(r) => {
Console.println("result=[" + r + "]")
assert(r != null)
assert(r.isEmpty)
}
}
}
test("insert after fetch") {
val url = "http://www.google.com"
val content = "<HTML>Some arbitary content</HTML>"
val fetchMetadata = List(
("feed_title", "Some arbitary title"),
("feed_summary", "Jack the giant killer")).toMap
cassDao.insertFetched(url, 0, fetchMetadata, content)
cassDao.getByUrl(url, List.empty) match {
case Right(row) => {
Console.println("row (after fetch, #-cols:" +
row.size + ") = " + row)
assert(row != null)
assert(! row.isEmpty)
assert(row.size == 9)
}
case Left(f) => fail("unexpected exception")
}
}
test("insert after parse") {
val url = "http://www.google.com"
val textContent = "Some arbitary content"
val parseMetadata = List(
("title", "The real title"),
("author", "Jack G Killer")).toMap
cassDao.insertParsed(url, textContent, parseMetadata)
cassDao.getByUrl(url, List.empty) match {
case Right(row) => {
Console.println("row (after parse, #-cols:" +
row.size + ") = " + row)
assert(row != null)
assert(! row.isEmpty)
assert(row.size == 11)
}
case Left(f) => fail("unexpected exception")
}
}
test("insert after index") {
val url = "http://www.google.com"
cassDao.insertIndexed(url)
cassDao.getByUrl(url, List.empty) match {
case Right(row) => {
Console.println("row (after index, #-cols:" +
row.size + ") = " + row)
assert(row != null)
assert(! row.isEmpty)
assert(row.size == 11)
}
case Left(f) => fail("unexpected exception")
}
}
test("get inserted record, selected fields") {
val url = "http://www.google.com"
val result = cassDao.getByUrl(url, List("pts", "parsemeta"))
result match {
case Right(row) => {
Console.println("partial row get = " + row)
assert(row != null)
assert(! row.isEmpty)
assert(row.size == 3)
}
case Left(f) => {
f.e.printStackTrace()
fail("unexpected exception", f.e)
}
}
}
}
|
And thats all I have for today. Hopefully next post I will talk about something else :-).
Be the first to comment. Comments are moderated to prevent spam.
Post a Comment