We've been using Cassandra in our crawling/indexing system for a while now, but so far almost all I knew about it is that its there. I've been meaning to take a look at it for a while, but never had a need to, since the schema was already built and wrapper classes provided for accessing the data. Even now, as a consumer and producer of crawl/index data which ultimately ends up in Cassandra, I still don't have a need to check it out in depth. But I am the curious type, and figured that understanding it may help me design other systems where Cassandra's asynchronous writes, high availability and incremental scalability would come in useful.
Since I learn best by doing, I decided to take a simple example of a subsystem where we store crawl data in a pair of RDBMS tables, and convert it to a Cassandra data model. I also wanted to write some code to migrate the data off the RDBMS tables into Cassandra, and provide some rudimentary functionality to retrieve a record by primary key - basically explore a bit of Cassandra's Java client API.
The RDBMS Model
The RDBMS model consists of two tables set up as master-detail. The master table contains URL level information, and the detail table contains multiple rows per URL, each row corresponding to a (nodeId,score) pair - the nodeIds represent concepts from our taxonomy that match the content and the score correspond roughly to the number of times this concept was found in the URL's content. The schema for the two tables are shown below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | Urls: {
key: url_id,
url,
status,
created,
modified
};
Url_Map: {
key: {
url_id,
node_id
},
score,
created
};
|
Cassandra Model
Cassandra's building blocks are analogous to RDBMS building blocks, but slightly different - the obvious difference is that RDBMS is row-based and Cassandra is column-based. There are many very good references available on the Internet, including this one which I found very helpful, so I will just briefly describe my understanding here. If you know nothing about Cassandra or column oriented databases, you should read up.
Just like a RDBMS server can host multiple databases, a Cassandra server can host multiple Keyspaces. Just like each RDBMS database can store multiple tables (and nested tables in Oracle), Cassandra keyspaces can store Column Families and Super Column Families. Each Column Family contains a set of key-value pairs, where the key is like a primary key and the value is a set of Columns. A Column is a collection of key-value pairs (triples actually if you count the timestamp). The key corresponds to the column name in a RDBMS and the value is the column value. A Super Column Family contains SuperColumns, which consist of a key-value pair, but the value is a Column. Because a column is a key-value pair, there can be different number of these per Column set (or row in RDBMS).
One thing to note is that a Column Family can only contain Columns, and a Super Column Family can only contain Super Columns. Also, Cassandra schema design is driven by the queries it will support, so denormalization is encouraged if it makes sense from a query point of view. With this in mind, I decided to model this data as a single Cassandra Super Column Family as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 | Urls: { // Super Column Family
key: url {
info: {
created : '2010-01-01',
modified : '2010-01-01',
status : 'crawled',
},
scores : {
123456 : 123.4, // nodeId : score
...
}
}
};
|
Cassandra Installation
I installed Cassandra 0.6.6 (the GA version at the time of writing this). To add the Super Column Family definition, you have to add the following definition to the conf/storage.xml file (under the default Keyspace declaration).
1 2 3 4 5 6 7 | <Keyspace Name="MyKeyspace">
<ColumnFamily Name="Urls" CompareWith="UTF8Type"
ColumnType="Super" CompareSubColumnsWith="UTF8Type"/>
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>
|
To start the Cassandra server, you need to issue the following command on the Unix prompt:
1 2 | sujit@cyclone:~$ cd $CASSANDRA_HOME
sujit@cyclone:cassandra-0.6.6$ bin/cassandra -f
|
This will start Cassandra in foreground, listening on port 9160.
Cassandra DAO
The DAO code is shown below. It provides methods to do CRUD operations on the Urls Super Column Family. I was just going for a broad understanding of the Cassandra Java API here, in a real application there will be additional methods to do application specific things.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | // Source: src/main/java/com/mycompany/cassandra/db/UrlBeanCassandraDao.java
package com.mycompany.cassandra.db;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
/**
* Provides CRUD operations on a Cassandra Database.
*/
public class UrlBeanCassandraDao {
private static final String CASSANDRA_HOST = "localhost";
private static final int CASSANDRA_PORT = 9160;
private static final String CASSANDRA_KEYSPACE = "MyKeyspace";
private static final ConsistencyLevel CL_ONE = ConsistencyLevel.ONE;
private static final SimpleDateFormat DATE_FORMATTER =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
private TTransport transport;
private Cassandra.Client client;
public UrlBeanCassandraDao() {
transport = new TSocket(CASSANDRA_HOST, CASSANDRA_PORT);
TProtocol protocol = new TBinaryProtocol(transport);
client = new Cassandra.Client(protocol);
}
public void init() throws Exception {
transport.open();
}
public void destroy() {
transport.close();
}
public void create(UrlBean urlBean) throws Exception {
long now = System.nanoTime();
// persist the URL master data under a key called "info"
List<Mutation> mutations = new ArrayList<Mutation>();
mutations.add(getInfoMutation("info", urlBean, now));
if (urlBean.getScores().size() > 0) {
mutations.add(getScoreMutation("scores", urlBean.getScores(), now));
}
Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
row.put("Urls", mutations);
Map<String,Map<String,List<Mutation>>> data =
new HashMap<String,Map<String,List<Mutation>>>();
data.put(urlBean.getUrl(), row);
client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
}
public List<UrlBean> retrieve(String url) throws Exception {
SlicePredicate predicate = new SlicePredicate();
SliceRange range = new SliceRange();
range.setStart(new byte[0]);
range.setFinish(new byte[0]);
predicate.setSlice_range(range);
ColumnParent columnFamily = new ColumnParent("Urls");
KeyRange keyrange = new KeyRange();
keyrange.start_key = url;
keyrange.end_key = url;
List<UrlBean> urlBeans = new ArrayList<UrlBean>();
List<KeySlice> slices = client.get_range_slices(
CASSANDRA_KEYSPACE, columnFamily, predicate, keyrange, CL_ONE);
for (KeySlice slice : slices) {
List<ColumnOrSuperColumn> coscs = slice.columns;
UrlBean urlBean = new UrlBean();
urlBean.setUrl(slice.key);
for (ColumnOrSuperColumn cosc : coscs) {
SuperColumn scol = cosc.super_column;
String scolName = new String(scol.name, "UTF-8");
if ("info".equals(scolName)) {
List<Column> cols = scol.columns;
for (Column col : cols) {
String colName = new String(col.name, "UTF-8");
if ("created".equals(colName)) {
urlBean.setCreated(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
} else if ("modified".equals(colName)) {
urlBean.setModified(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
} else if ("crawStatus".equals(colName)) {
urlBean.setCrawlStatus(new String(col.value, "UTF-8"));
}
}
} else if ("scores".equals(scolName)) {
List<Column> cols = scol.columns;
for (Column col : cols) {
ScoreBean scoreBean = new ScoreBean();
scoreBean.setImuid(new String(col.name, "UTF-8"));
scoreBean.setScore(Float.valueOf(new String(col.value, "UTF-8")));
urlBean.addScore(scoreBean);
}
}
}
urlBeans.add(urlBean);
}
return urlBeans;
}
public void update(UrlBean urlBean) throws Exception {
delete(urlBean);
create(urlBean);
}
public void delete(UrlBean urlBean) throws Exception {
SlicePredicate predicate = new SlicePredicate();
List<byte[]> cols = new ArrayList<byte[]>();
cols.add("info".getBytes());
cols.add("scores".getBytes());
predicate.column_names = cols;
Deletion deletion = new Deletion();
deletion.predicate = predicate;
deletion.timestamp = System.nanoTime();
Mutation mutation = new Mutation();
mutation.deletion = deletion;
List<Mutation> mutations = new ArrayList<Mutation>();
mutations.add(mutation);
Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
row.put("Urls", mutations);
Map<String,Map<String,List<Mutation>>> data =
new HashMap<String,Map<String,List<Mutation>>>();
data.put("info", row);
data.put("scores", row);
client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
}
private Mutation getInfoMutation(String name, UrlBean urlBean, long timestamp) {
List<Column> cols = new ArrayList<Column>();
cols.add(getColumn("url", urlBean.getUrl(), timestamp));
cols.add(getColumn("created", DATE_FORMATTER.format(urlBean.getCreated()),
timestamp));
cols.add(getColumn("modified", DATE_FORMATTER.format(urlBean.getModified()),
timestamp));
cols.add(getColumn("status", urlBean.getCrawlStatus(), timestamp));
SuperColumn scol = new SuperColumn("info".getBytes(), cols);
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
cosc.super_column = scol;
Mutation mutation = new Mutation();
mutation.column_or_supercolumn = cosc;
return mutation;
}
private Mutation getScoreMutation(String name, List<ScoreBean> scores,
long timestamp) {
List<Column> cols = new ArrayList<Column>();
for (ScoreBean score : scores) {
cols.add(getColumn(score.getImuid(), String.valueOf(score.getScore()),
timestamp));
}
SuperColumn scol = new SuperColumn("scores".getBytes(), cols);
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
cosc.super_column = scol;
Mutation mutation = new Mutation();
mutation.column_or_supercolumn = cosc;
return mutation;
}
private Column getColumn(String name, String value, long timestamp) {
return new Column(name.getBytes(), value.getBytes(), timestamp);
}
}
|
As you can see, its pretty verbose, so there are already a number of initiatives to provide wrappers that expose simpler method signatures. Its also quite simple to write your own.
You will notice references to UrlBean and ScoreBean in the above code. These are just plain POJOs to hold the data. Here they are, with getters and setters removed for brevity.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | // Source: src/main/java/com/mycompany/cassandra/db/UrlBean.java
package com.mycompany.cassandra.db;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class UrlBean {
private String url;
private Date created;
private Date modified;
private String crawlStatus;
private List<ScoreBean> scores = new ArrayList<ScoreBean>();
...
public void addScore(ScoreBean score) {
scores.add(score);
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 | // Source: src/main/java/com/mycompany/cassandra/db/ScoreBean.java
package com.mycompany.cassandra.db;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class ScoreBean {
private String nodeId;
private Float score;
...
}
|
Migration Code
To migrate code off the two RDBMS tables, I wrote a little loader which loops through the tables and builds the UrlBean, then calls the create() method on the DAO. Here is the code for the loader - nothing to it, really, just some simple JDBC code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | // Source: src/main/java/com/mycompany/cassandra/db/CassandraDbLoader.java
package com.mycompany.cassandra.db;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class CassandraDbLoader {
private static final String DB_URL = "...";
private static final String DB_USER = "...";
private static final String DB_PASS = "...";
private UrlBeanCassandraDao urlBeanCassandraDao;
public void setCassandraCrudService(UrlBeanCassandraDao urlBeanCassandraDao) {
this.urlBeanCassandraDao = urlBeanCassandraDao;
}
public void load() throws Exception {
// database
Class.forName("oracle.jdbc.OracleDriver");
Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS);
PreparedStatement psMaster = conn.prepareStatement(
"select url_id, url, status, created, modified from urls");
PreparedStatement psDetail = conn.prepareStatement(
"select node_id, score from url_map where url_id = ?");
ResultSet rsMaster = null;
// start cassandra client
urlBeanCassandraDao.init();
try {
rsMaster = psMaster.executeQuery();
while (rsMaster.next()) {
String url = rsMaster.getString("url");
System.out.println("Migrating URL: " + url);
UrlBean urlBean = new UrlBean();
urlBean.setUrl(url);
urlBean.setCreated(rsMaster.getDate("created"));
urlBean.setModified(rsMaster.getDate("modified"));
urlBean.setCrawlStatus(rsMaster.getString("status"));
int id = rsMaster.getBigDecimal("url_id").intValue();
loadScores(psDetail, urlBean, id);
urlBeanCassandraDao.create(urlBean);
}
} finally {
if (rsMaster != null) {
try { rsMaster.close(); } catch (Exception e) {}
try { psMaster.close(); } catch (Exception e) {}
try { psDetail.close(); } catch (Exception e) {}
try { urlBeanCassandraDao.destroy(); } catch (Exception e) {}
}
}
}
private void loadScores(PreparedStatement psDetail, UrlBean urlBean, int id)
throws Exception {
ResultSet rsDetail = null;
try {
psDetail.setBigDecimal(1, new BigDecimal(id));
rsDetail = psDetail.executeQuery();
while (rsDetail.next()) {
ScoreBean scoreBean = new ScoreBean();
scoreBean.setImuid(rsDetail.getString("node_id"));
float score = rsDetail.getFloat("score");
scoreBean.setScore(score);
urlBean.addScore(scoreBean);
}
} finally {
if (rsDetail != null) {
try { rsDetail.close(); } catch (Exception e) {}
}
}
}
public static void main(String[] argv) throws Exception {
UrlBeanCassandraDao dao = new UrlBeanCassandraDao();
CassandraDbLoader loader = new CassandraDbLoader();
loader.setCassandraCrudService(dao);
loader.load();
}
}
|
And to test that these made it in okay, I wrote a little JUnit test that calls the retrieve() method with a given URL, and it comes back fine.
If you already know about Cassandra or other column oriented databases, this is probably pretty basic stuff. However, this exercise has given me some understanding of Cassandra and its Java API, and I hope to use it in more detail at some point in the future.