Saturday, October 30, 2010

Modeling relational data with Cassandra

We've been using Cassandra in our crawling/indexing system for a while now, but so far almost all I knew about it is that its there. I've been meaning to take a look at it for a while, but never had a need to, since the schema was already built and wrapper classes provided for accessing the data. Even now, as a consumer and producer of crawl/index data which ultimately ends up in Cassandra, I still don't have a need to check it out in depth. But I am the curious type, and figured that understanding it may help me design other systems where Cassandra's asynchronous writes, high availability and incremental scalability would come in useful.

Since I learn best by doing, I decided to take a simple example of a subsystem where we store crawl data in a pair of RDBMS tables, and convert it to a Cassandra data model. I also wanted to write some code to migrate the data off the RDBMS tables into Cassandra, and provide some rudimentary functionality to retrieve a record by primary key - basically explore a bit of Cassandra's Java client API.

The RDBMS Model

The RDBMS model consists of two tables set up as master-detail. The master table contains URL level information, and the detail table contains multiple rows per URL, each row corresponding to a (nodeId,score) pair - the nodeIds represent concepts from our taxonomy that match the content and the score correspond roughly to the number of times this concept was found in the URL's content. The schema for the two tables are shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Urls: {
  key: url_id,
  url,
  status,
  created,
  modified
};
Url_Map: {
  key: {
    url_id, 
    node_id
  },
  score,
  created
};

Cassandra Model

Cassandra's building blocks are analogous to RDBMS building blocks, but slightly different - the obvious difference is that RDBMS is row-based and Cassandra is column-based. There are many very good references available on the Internet, including this one which I found very helpful, so I will just briefly describe my understanding here. If you know nothing about Cassandra or column oriented databases, you should read up.

Just like a RDBMS server can host multiple databases, a Cassandra server can host multiple Keyspaces. Just like each RDBMS database can store multiple tables (and nested tables in Oracle), Cassandra keyspaces can store Column Families and Super Column Families. Each Column Family contains a set of key-value pairs, where the key is like a primary key and the value is a set of Columns. A Column is a collection of key-value pairs (triples actually if you count the timestamp). The key corresponds to the column name in a RDBMS and the value is the column value. A Super Column Family contains SuperColumns, which consist of a key-value pair, but the value is a Column. Because a column is a key-value pair, there can be different number of these per Column set (or row in RDBMS).

One thing to note is that a Column Family can only contain Columns, and a Super Column Family can only contain Super Columns. Also, Cassandra schema design is driven by the queries it will support, so denormalization is encouraged if it makes sense from a query point of view. With this in mind, I decided to model this data as a single Cassandra Super Column Family as shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Urls: { // Super Column Family
  key: url {
    info: {
      created : '2010-01-01',
      modified : '2010-01-01',
      status : 'crawled',
    },
    scores : {
      123456 : 123.4,  // nodeId : score
      ...
    }
  }
};

Cassandra Installation

I installed Cassandra 0.6.6 (the GA version at the time of writing this). To add the Super Column Family definition, you have to add the following definition to the conf/storage.xml file (under the default Keyspace declaration).

1
2
3
4
5
6
7
    <Keyspace Name="MyKeyspace">
      <ColumnFamily Name="Urls" CompareWith="UTF8Type"
        ColumnType="Super" CompareSubColumnsWith="UTF8Type"/>
      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
      <ReplicationFactor>1</ReplicationFactor>
      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
    </Keyspace>

To start the Cassandra server, you need to issue the following command on the Unix prompt:

1
2
sujit@cyclone:~$ cd $CASSANDRA_HOME
sujit@cyclone:cassandra-0.6.6$ bin/cassandra -f

This will start Cassandra in foreground, listening on port 9160.

Cassandra DAO

The DAO code is shown below. It provides methods to do CRUD operations on the Urls Super Column Family. I was just going for a broad understanding of the Cassandra Java API here, in a real application there will be additional methods to do application specific things.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Source: src/main/java/com/mycompany/cassandra/db/UrlBeanCassandraDao.java
package com.mycompany.cassandra.db;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

/**
 * Provides CRUD operations on a Cassandra Database.
 */
public class UrlBeanCassandraDao {

  private static final String CASSANDRA_HOST = "localhost";
  private static final int CASSANDRA_PORT = 9160;
  private static final String CASSANDRA_KEYSPACE = "MyKeyspace";
  private static final ConsistencyLevel CL_ONE = ConsistencyLevel.ONE;
  private static final SimpleDateFormat DATE_FORMATTER = 
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  private TTransport transport;
  private Cassandra.Client client;
  
  public UrlBeanCassandraDao() {
    transport = new TSocket(CASSANDRA_HOST, CASSANDRA_PORT);
    TProtocol protocol = new TBinaryProtocol(transport);
    client = new Cassandra.Client(protocol);
  }

  public void init() throws Exception {
    transport.open();
  }
  
  public void destroy() {
    transport.close();
  }

  public void create(UrlBean urlBean) throws Exception {
    long now = System.nanoTime();
    // persist the URL master data under a key called "info"
    List<Mutation> mutations = new ArrayList<Mutation>();
    mutations.add(getInfoMutation("info", urlBean, now));
    if (urlBean.getScores().size() > 0) {
      mutations.add(getScoreMutation("scores", urlBean.getScores(), now));
    }
    Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
    row.put("Urls", mutations);
    Map<String,Map<String,List<Mutation>>> data = 
      new HashMap<String,Map<String,List<Mutation>>>();
    data.put(urlBean.getUrl(), row);
    client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
  }

  public List<UrlBean> retrieve(String url) throws Exception {
    SlicePredicate predicate = new SlicePredicate();
    SliceRange range = new SliceRange();
    range.setStart(new byte[0]);
    range.setFinish(new byte[0]);
    predicate.setSlice_range(range);
    ColumnParent columnFamily = new ColumnParent("Urls");
    KeyRange keyrange = new KeyRange();
    keyrange.start_key = url;
    keyrange.end_key = url;
    List<UrlBean> urlBeans = new ArrayList<UrlBean>();
    List<KeySlice> slices = client.get_range_slices(
      CASSANDRA_KEYSPACE, columnFamily, predicate, keyrange, CL_ONE);
    for (KeySlice slice : slices) {
      List<ColumnOrSuperColumn> coscs = slice.columns;
      UrlBean urlBean = new UrlBean();
      urlBean.setUrl(slice.key);
      for (ColumnOrSuperColumn cosc : coscs) {
        SuperColumn scol = cosc.super_column;
        String scolName = new String(scol.name, "UTF-8");
        if ("info".equals(scolName)) {
          List<Column> cols = scol.columns;
          for (Column col : cols) {
            String colName = new String(col.name, "UTF-8");
            if ("created".equals(colName)) {
              urlBean.setCreated(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
            } else if ("modified".equals(colName)) {
              urlBean.setModified(DATE_FORMATTER.parse(new String(col.value, "UTF-8")));
            } else if ("crawStatus".equals(colName)) {
              urlBean.setCrawlStatus(new String(col.value, "UTF-8"));
            }
          }
        } else if ("scores".equals(scolName)) {
          List<Column> cols = scol.columns;
          for (Column col : cols) {
            ScoreBean scoreBean = new ScoreBean();
            scoreBean.setImuid(new String(col.name, "UTF-8"));
            scoreBean.setScore(Float.valueOf(new String(col.value, "UTF-8")));
            urlBean.addScore(scoreBean);
          }
        }
      }
      urlBeans.add(urlBean);
    }
    return urlBeans;
  }

  public void update(UrlBean urlBean) throws Exception {
    delete(urlBean);
    create(urlBean);
  }
  
  public void delete(UrlBean urlBean) throws Exception {
    SlicePredicate predicate = new SlicePredicate();
    List<byte[]> cols = new ArrayList<byte[]>();
    cols.add("info".getBytes());
    cols.add("scores".getBytes());
    predicate.column_names = cols;
    Deletion deletion = new Deletion();
    deletion.predicate = predicate;
    deletion.timestamp = System.nanoTime();
    Mutation mutation = new Mutation();
    mutation.deletion = deletion;
    List<Mutation> mutations = new ArrayList<Mutation>();
    mutations.add(mutation);
    Map<String,List<Mutation>> row = new HashMap<String,List<Mutation>>();
    row.put("Urls", mutations);
    Map<String,Map<String,List<Mutation>>> data = 
      new HashMap<String,Map<String,List<Mutation>>>();
    data.put("info", row);
    data.put("scores", row);
    client.batch_mutate(CASSANDRA_KEYSPACE, data, CL_ONE);
  }
  
  private Mutation getInfoMutation(String name, UrlBean urlBean, long timestamp) {
    List<Column> cols = new ArrayList<Column>();
    cols.add(getColumn("url", urlBean.getUrl(), timestamp));
    cols.add(getColumn("created", DATE_FORMATTER.format(urlBean.getCreated()), 
      timestamp));
    cols.add(getColumn("modified", DATE_FORMATTER.format(urlBean.getModified()), 
      timestamp));
    cols.add(getColumn("status", urlBean.getCrawlStatus(), timestamp));
    SuperColumn scol = new SuperColumn("info".getBytes(), cols);
    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
    cosc.super_column = scol;
    Mutation mutation = new Mutation();
    mutation.column_or_supercolumn = cosc;
    return mutation;
  }

  private Mutation getScoreMutation(String name, List<ScoreBean> scores, 
      long timestamp) {
    List<Column> cols = new ArrayList<Column>();
    for (ScoreBean score : scores) {
      cols.add(getColumn(score.getImuid(), String.valueOf(score.getScore()), 
        timestamp));
    }
    SuperColumn scol = new SuperColumn("scores".getBytes(), cols);
    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
    cosc.super_column = scol;
    Mutation mutation = new Mutation();
    mutation.column_or_supercolumn = cosc;
    return mutation;
  }

  private Column getColumn(String name, String value, long timestamp) {
    return new Column(name.getBytes(), value.getBytes(), timestamp);
  }
}

As you can see, its pretty verbose, so there are already a number of initiatives to provide wrappers that expose simpler method signatures. Its also quite simple to write your own.

You will notice references to UrlBean and ScoreBean in the above code. These are just plain POJOs to hold the data. Here they are, with getters and setters removed for brevity.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Source: src/main/java/com/mycompany/cassandra/db/UrlBean.java
package com.mycompany.cassandra.db;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class UrlBean {

  private String url;
  private Date created;
  private Date modified;
  private String crawlStatus;
  private List<ScoreBean> scores = new ArrayList<ScoreBean>();
  ... 
  public void addScore(ScoreBean score) {
    scores.add(score);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Source: src/main/java/com/mycompany/cassandra/db/ScoreBean.java
package com.mycompany.cassandra.db;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public class ScoreBean {

  private String nodeId;
  private Float score;
  ...  
}

Migration Code

To migrate code off the two RDBMS tables, I wrote a little loader which loops through the tables and builds the UrlBean, then calls the create() method on the DAO. Here is the code for the loader - nothing to it, really, just some simple JDBC code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Source: src/main/java/com/mycompany/cassandra/db/CassandraDbLoader.java
package com.mycompany.cassandra.db;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class CassandraDbLoader {

  private static final String DB_URL = "...";
  private static final String DB_USER = "...";
  private static final String DB_PASS = "...";

  private UrlBeanCassandraDao urlBeanCassandraDao;
  
  public void setCassandraCrudService(UrlBeanCassandraDao urlBeanCassandraDao) {
    this.urlBeanCassandraDao = urlBeanCassandraDao;
  }
  
  public void load() throws Exception {
    // database
    Class.forName("oracle.jdbc.OracleDriver");
    Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS);
    PreparedStatement psMaster = conn.prepareStatement(
      "select url_id, url, status, created, modified from urls");
    PreparedStatement psDetail = conn.prepareStatement(
      "select node_id, score from url_map where url_id = ?");
    ResultSet rsMaster = null;
    // start cassandra client
    urlBeanCassandraDao.init();
    try {
      rsMaster = psMaster.executeQuery();
      while (rsMaster.next()) {
        String url = rsMaster.getString("url");
        System.out.println("Migrating URL: " + url);
        UrlBean urlBean = new UrlBean();
        urlBean.setUrl(url);
        urlBean.setCreated(rsMaster.getDate("created"));
        urlBean.setModified(rsMaster.getDate("modified"));
        urlBean.setCrawlStatus(rsMaster.getString("status"));
        int id = rsMaster.getBigDecimal("url_id").intValue();
        loadScores(psDetail, urlBean, id);
        urlBeanCassandraDao.create(urlBean);
      }
    } finally {
      if (rsMaster != null) {
        try { rsMaster.close(); } catch (Exception e) {}
        try { psMaster.close(); } catch (Exception e) {}
        try { psDetail.close(); } catch (Exception e) {}
        try { urlBeanCassandraDao.destroy(); } catch (Exception e) {}
      }
    }
  }

  private void loadScores(PreparedStatement psDetail, UrlBean urlBean, int id) 
      throws Exception {
    ResultSet rsDetail = null;
    try {
      psDetail.setBigDecimal(1, new BigDecimal(id));
      rsDetail = psDetail.executeQuery();
      while (rsDetail.next()) {
        ScoreBean scoreBean = new ScoreBean();
        scoreBean.setImuid(rsDetail.getString("node_id"));
        float score = rsDetail.getFloat("score");
        scoreBean.setScore(score);
        urlBean.addScore(scoreBean);
      }
    } finally {
      if (rsDetail != null) {
        try { rsDetail.close(); } catch (Exception e) {}
      }
    }
  }

  public static void main(String[] argv) throws Exception {
    UrlBeanCassandraDao dao = new UrlBeanCassandraDao();
    CassandraDbLoader loader = new CassandraDbLoader();
    loader.setCassandraCrudService(dao);
    loader.load();
  }
}

And to test that these made it in okay, I wrote a little JUnit test that calls the retrieve() method with a given URL, and it comes back fine.

If you already know about Cassandra or other column oriented databases, this is probably pretty basic stuff. However, this exercise has given me some understanding of Cassandra and its Java API, and I hope to use it in more detail at some point in the future.

Thursday, October 21, 2010

A Custom Drupal XMLRPC Service

I have written earlier about using Drupal as an XMLRPC client - via a custom module that hooks into the persistence lifecycle of a Drupal node in order to send XMLRPC requests out to an external publishing system containing an embedded XMLRPC server. Drupal can also act as an XMLRPC server, receiving and acting on XMLRPC requests from external clients.

We have been using one of the built-in services (the comment.save from Comment Services Module). However, we recently decided to build an external Comment Moderation tool, since we have outgrown the rather rudimentary comment moderation form available in Drupal, and that needs services exposed on Drupal to publish, unpublish and delete a comment, which are not available from the comment services module.

My initial explorations on Google pointed me to the this post on the Riff Blog, and thence to the XMLRPC hook, available in core Drupal. However, the results were not too satisfying (it didn't show up on the Services page at /admin/build/services) so I quickly abandoned this path and went looking for something better.

I found my answer in the source code for the Comment Services Module - it implemented hook_service(), so that pointed me to the Services Module, which in turn led me to the Services Handbook, and buried in the links on the right navigation toolbar on this page, some information that I could actually use.

Interestingly, not only does Drupal allow you to build/install custom services, it also allows for custom servers (such as JSON or SOAP). However, since I already had an XMLRPC server installed for the pre-installed services, I did not explore this option. There is more information about this on Deja Augustine's post here, along with some skeletal code for a simple service.

My example is a bit more involved, but uses the same ideas as Deja's post. Build a custom module that is in the "Services - services" package and depends on the services package (to the best of my knowledge, these are required, when I tried putting it under a different package, it would not show up on the Services page). Therefore, although I physically put the code under the sites/all/modules/custom/cmxs directory, the package name is "Services - services". Here is the .info file for my module.

1
2
3
4
5
name = cmxs
description = Comment Moderation XMLRPC Services
package = Services - services
dependencies[] = services
core = 6.x

The module code is also quite simple. The hook_service() declares the methods that the service makes available, along with name, input and output parameter name and types, and the names of callback functions each method must call. You can install the module immediately after your hook_service() declarations are done (along with stubs for the callback functions) by going to the Modules (admin/build/modules) page and enabling the new module. The new services will show up on the Services (admin/build/services) page, along with forms to manually test the services.

Here is the complete code for my services module:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
<?php

define('CMXS_SUCCESS', 0);
define('CMXS_COMMENT_PUBLISHED', 0);
define('CMXS_COMMENT_NOT_PUBLISHED', 1);

/**
 * Implementation of hook_service().
 * Describes the methods that are exposed by this service module.
 */
function cmxs_service() {
  return array (
    // comment_publish
    array (
      '#method' => 'cmxs.comment_publish',
      '#callback' => 'cmxs_comment_publish',
      '#access callback' => 'cmxs_user_access',
      '#args' => array (
        array (
          '#name' => 'cids',
          '#type' => 'string',
          '#description' => t('Comma-separated list of Comment IDs, eg. $cid1,$cid2,...')
        )
      ),
      '#return' => 'int',
      '#help' => t('Publishes the specified comment.')
    ),
    // comment_unpublish
    array (
      '#method' => 'cmxs.comment_unpublish',
      '#callback' => 'cmxs_comment_unpublish',
      '#access callback' => 'cmxs_user_access',
      '#args' => array (
        array (
          '#name' => 'cids',
          '#type' => 'string',
          '#description' => t('Comma-separated list of Comment IDs, eg. $cid1,$cid2,...')
        )
      ),
      '#return' => 'int',
      '#help' => t('Unpublishes the specified comment.')
    ),
    // comment_delete
    array (
      '#method' => 'cmxs.comment_delete',
      '#callback' => 'cmxs_comment_delete',
      '#access callback' => 'cmxs_user_access',
      '#args' => array (
        array (
          '#name' => 'cids',
          '#type' => 'string',
          '#description' => t('Comma-separated list of Comment IDs, eg. $cid1,$cid2,...')
        )
      ),
      '#return' => 'int',
      '#help' => t('Deletes the specified comment.')
    ),
  );
}

/**
 * Implementation of hook_disable().
 * Actions that need to happen when this module is disabled.
 */
function cmxs_disable() {
  cache_clear_all('services:methods', 'cache');
}

/**
 * Implementation of hook_enable().
 * Actions that need to happen when this module is enabled.
 */
function cmxs_enable() {
  cache_clear_all('services:methods', 'cache');
}

/**
 * Custom user access function to short circuit user_access() since
 * we want to bypass Drupal's authentication, since the tool will
 * always send authenticated requests.
 */
function cmxs_user_access() {
  return TRUE;
}

/**
 * Finds the comments corresponding to the cids specified that are in
 * state NOT_PUBLISHED and updates their status to PUBLISHED, then saves 
 * them.
 */
function cmxs_comment_publish($cids) {
  watchdog('cmxs', 'cmxs_comment_publish(cids=' . $cids . ')');
  $comments = _cmxs_find_comments($cids, CMXS_COMMENT_NOT_PUBLISHED);
  foreach ($comments as $comment) {
    $comment->status = CMXS_COMMENT_PUBLISHED;
    comment_save((array) $comment);
    watchdog('cmxs', 'Comment (cid=' . $comment->cid . ') published');
  }
  return CMXS_SUCCESS;
}

/**
 * Finds the comments corresponding to the cids specified that are in
 * state PUBLISHED and updates their status to PUBLISHED, then saves them.
 */
function cmxs_comment_unpublish($cids) {
  watchdog('cmxs', 'cmxs_comment_unpublish(cids=' . $cids . ')');
  $comments = _cmxs_find_comments($cids, CMXS_COMMENT_PUBLISHED);
  foreach ($comments as $comment) {
    $comment->status = CMXS_COMMENT_NOT_PUBLISHED;
    comment_save((array) $comment);
    watchdog('cmxs', 'Comment (cid=' . $comment->cid . ') unpublished');
  }
  return CMXS_SUCCESS;
}

/**
 * Since deleting a comment requires administrator privileges, we cannot
 * call comment_delete($cid) directly (since our service has no privileges).
 * So we unpublish first, and then delete using a direct SQL call.
 */
function cmxs_comment_delete($cids) {
  watchdog('cmxs', 'cmxs_comment_delete(cids=' . $cids . ')');
  $comments = _cmxs_find_comments($cids);
  foreach ($comments as $comment) {
    $comment->status = CMXS_COMMENT_NOT_PUBLISHED;
    comment_save((array) $comment); // UDXI unpublished called here
    _cmxs_delete_comment($comment->cid);
    watchdog('cmxs', 'Comment (cid=' . $comment->cid . ') deleted');
  }
  return CMXS_SUCCESS;
}

/**
 * Given a comma-separated list of CIDs, return a list of comments that
 * correspond to these CIDs. CIDs that don't correspond to a Comment in
 * Drupal are silently ignored. If the optional parameter status is provided
 * only comments with the specified status are returned.
 */
function _cmxs_find_comments($cids, $status = NULL) {
  $comments = array();
  $cid_array = explode(',', $cids);
  if ($cid_array == FALSE) {
    return $comments;
  } else {
    foreach ($cid_array as $cid) {
      $comment = _comment_load($cid);
      if ($comment != NULL) {
        if ($status != NULL) {
          if ($comment->status != $status) {
            continue;
          }
        }
        $comments[] = $comment;
      }
    }
  }
  return $comments;
}

/**
 * Deletes a comment corresponding to the specified cid from the Drupal
 * database. There is no authorization check.
 */
function _cmxs_delete_comment($cid)  {
  $db_result = db_query('DELETE FROM {comments} WHERE cid = %d', $cid);
}

As you can see, the hook_service() is purely declarative. I also have hook_enable() and hook_disable() implementations to make it a bit quicker to develop (changes in method signature only needs a module disable followed by a module enable, no update.php run required).

I also have a custom user_access() function which does nothing. Because I am using non-authenticated XMLRPC requests, and the Drupal comment API for saving the updated comment object, I figured that Drupal would insist (as it should) on an authorized user to do the updates. So the cmxs_user_access() function is there to bypass Drupal's authentication.

I have mentioned before about how impressed I am by Drupal's overall design, and the Services module is no exception. Like the rest of Drupal, it follows the convention over configuration philosophy, and once you understand the convention, building a custom service is really quite simple.

Friday, October 08, 2010

Denormalizing Maps with Lucene Payloads

Last week, I tried out Lucene's Payload and SpanQuery features to do some position based custom scoring of terms. I've been interested in the Payload feature ever since I first read about it, because it looked like something I could use to solve another problem at work...

The problem is to to be able to store a mapping of concepts to scores along with a document. Our search uses a medical taxonomy, basically a graph of medical concepts (nodes) and their relationships to each other (edges). During indexing, a document is analyzed and a map of node IDs and scores is created and stored in the index. The score is composed of various components, but for simplicity, it can be thought of as the number of occurrences of a node in the document. So after indexing, we would end up with something like this:

During search, the query is decomposed into concepts using a similar process, and a query consisting of one or more TermQueries (wrapped in a BooleanQuery) are used to pull documents out of the index. In pseudo-SQL, something like this:

1
2
3
4
5
  SELECT document FROM index
  WHERE nodeId = nodeID(1)
  ...
  AND/OR nodeId = nodeID(n)
  ORDER by (score(1) + score(n)) DESC

There are many approaches to efficiently model this sort of situation, and over the years we've tried a few. The approach I am going to describe uses Lucene's Payload feature. Basically, the concept map is "flattened" into the main Document, and the scores are farmed out to a Payload byte array, so we can use the scores for scoring our results.

Obviously, this is nothing new... other people have used Payloads to do very similar things. In fact, a lot of the code that follows is heavily based on the example in this Lucid Imagination blog post.

Indexing

At index time, we flatten our concept map into a whitespace separated list of key-value pairs, and the key and value in each element is separated out with a special character, in our case a "$" sign. So a concept map {p1 => 123.0, p2 => 234.0} would be transformed to "p1$123.0 p2$234.0".

Lucene provides the DelimitedPayloadTokenFilter, a custom TokenFilter to parse this string and convert it to equivalent term and payload pairs, so all we have to build on our own is our custom Analyzer. The IndexWriter will use this custom Analyzer for the "data" field in the JUnit test (see below).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Source: src/main/java/com/mycompany/payload/MyPayloadAnalyzer.java
package com.mycompany.payload;

import java.io.Reader;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.WhitespaceTokenizer;
import org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilter;
import org.apache.lucene.analysis.payloads.FloatEncoder;

public class MyPayloadAnalyzer extends Analyzer {

  @Override
  public TokenStream tokenStream(String fieldName, Reader reader) {
    return new DelimitedPayloadTokenFilter(
      new WhitespaceTokenizer(reader),
      '$', new FloatEncoder());
  }
}

Searching

On the search side, we create a custom Similarity implementation that reads the score from the payload and returns it. We will tell our searcher to use this Similarity implementation. We want to use only our concept scores, not make it part of the full Lucene score, so we indicate that when we create our PayloadTermQuery in our JUnit test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Source: src/main/java/com/mycompany/payload/MyPayloadSimilarity.java
package com.mycompany.payload;

import org.apache.lucene.analysis.payloads.PayloadHelper;
import org.apache.lucene.search.DefaultSimilarity;

public class MyPayloadSimilarity extends DefaultSimilarity {

  private static final long serialVersionUID = -2402909220013794848L;

  @Override
  public float scorePayload(int docId, String fieldName,
      int start, int end, byte[] payload, int offset, int length) {
    if (payload != null) {
      return PayloadHelper.decodeFloat(payload, offset);
    } else {
      return 1.0F;
    }
  }
}

The actual search logic is in the JUnit test shown below. Here I build a small index with some dummy data in RAM and query it using a straight PayloadTermQuery and two Boolean queries with embedded PayloadTermQueries.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// Source: src/test/java/com/mycompany/payload/MyPayloadQueryTest.java
package com.mycompany.payload;

import org.apache.commons.lang.StringUtils;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.payloads.AveragePayloadFunction;
import org.apache.lucene.search.payloads.PayloadTermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class MyPayloadQueryTest {

  private static IndexSearcher searcher;
  
  private static String[] data = {
    "p1$123.0 p2$2.0 p3$89.0",
    "p2$91.0 p1$5.0",
    "p3$56.0 p1$25.0",
    "p4$98.0 p5$65.0 p1$33.0"
  };

  @BeforeClass
  public static void setupBeforeClass() throws Exception {
    Directory directory = new RAMDirectory();
    IndexWriter writer = new IndexWriter(directory, 
      new MyPayloadAnalyzer(), IndexWriter.MaxFieldLength.UNLIMITED);;
    for (int i = 0; i < data.length; i++) {
      Document doc = new Document();
      doc.add(new Field("title", "Document #" + i, Store.YES, Index.NO));
      doc.add(new Field("data", data[i], Store.YES, Index.ANALYZED));
      writer.addDocument(doc);
    }
    writer.close();
    searcher = new IndexSearcher(directory);
    searcher.setSimilarity(new MyPayloadSimilarity());
  }

  @AfterClass
  public static void teardownAfterClass() throws Exception {
    if (searcher != null) {
      searcher.close();
    }
  }
  
  @Test
  public void testSingleTerm() throws Exception {
    PayloadTermQuery p1Query = new PayloadTermQuery(
      new Term("data", "p1"), new AveragePayloadFunction(), false);
    search(p1Query);
  }
  
  @Test
  public void testAndQuery() throws Exception {
    PayloadTermQuery p1Query = new PayloadTermQuery(
      new Term("data", "p1"), new AveragePayloadFunction(), false);
    PayloadTermQuery p2Query = new PayloadTermQuery(
      new Term("data", "p2"), new AveragePayloadFunction(), false);
    BooleanQuery query = new BooleanQuery();
    query.add(p1Query, Occur.MUST);
    query.add(p2Query, Occur.MUST);
    search(query);
  }
  
  @Test
  public void testOrQuery() throws Exception {
    PayloadTermQuery p1Query = new PayloadTermQuery(
      new Term("data", "p1"), new AveragePayloadFunction(), false);
    PayloadTermQuery p2Query = new PayloadTermQuery(
      new Term("data", "p2"), new AveragePayloadFunction(), false);
    BooleanQuery query = new BooleanQuery();
    query.add(p1Query, Occur.SHOULD);
    query.add(p2Query, Occur.SHOULD);
    search(query);
  }
  
  private void search(Query query) throws Exception {
    System.out.println("=== Running query: " + query.toString() + " ===");
    ScoreDoc[] hits = searcher.search(query, 10).scoreDocs;
    for (int i = 0; i < hits.length; i++) {
      Document doc = searcher.doc(hits[i].doc);
      System.out.println(StringUtils.join(new String[] {
        doc.get("title"),
        doc.get("data"),
        String.valueOf(hits[i].score)
      }, "  "));
    }
  }
}

The three tests (annotated with @Test) cover the basic use cases that I expect for this search - a single term search, an AND term search and an OR term search. The last two are done by embedding the individual PayloadTermQuery objects into a BooleanQuery. As you can see from the results below, this works quite nicely. This is good news for me, since based on my reading of the LIA2 book, I had (wrongly) concluded that Payloads can only be used with SpanQuery, and that you need special "payload aware" subclasses of SpanQuery to be able to use them (which is true in case of SpanQuery, BTW).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
=== Running query: data:p1 ===
Document #0  p1$123.0 p2$2.0 p3$89.0  123.0
Document #3  p4$98.0 p5$65.0 p1$33.0  33.0
Document #2  p3$56.0 p1$25.0  25.0
Document #1  p2$91.0 p1$5.0  5.0

=== Running query: +data:p1 +data:p2 ===
Document #0  p1$123.0 p2$2.0 p3$89.0  125.0
Document #1  p2$91.0 p1$5.0  96.0

=== Running query: data:p1 data:p2 ===
Document #0  p1$123.0 p2$2.0 p3$89.0  125.0
Document #1  p2$91.0 p1$5.0  96.0
Document #3  p4$98.0 p5$65.0 p1$33.0  16.5
Document #2  p3$56.0 p1$25.0  12.5

Performance

I also read (on the web, can't find the link now) that Payload queries are usually slower than their non-payload aware counterparts, so I decided to do a quick back-of-the-envelope calculation to see what sort of degradation to expect.

I took an existing index containing approximately 30K documents, and its associated (denormalized) concept index, and merged the two into a single new index with the concept map flattened into the document as described above. I ran 5 concept queries, first as a TermQuery (with a custom sort on the concept score field) and then as a PayloadTermQuery, 5 times each, discarding the first query (to eliminate cache warmup overheads), and averaged the wallclock elapsed times for each query. Here are the results:

Query Term #-results TermQuery (ms) PayloadTermQuery (ms)
2800541 46 0.25 1.5
2790981 39 0.25 1.75
5348177 50 0.75 7.0
2793084 50 0.5 1.75
2800232 50 0.5 0.75

So it appears that on average (excluding outliers), PayloadTermQuery calls are approximately 3-5 times slower than equivalent TermQuery calls. But they do offer a smaller disk (and consequently OS cache) footprint and a simpler programming model, so it remains to be seen if this makes sense for us to use.

Update: 2010-10-11

The situation changes when you factor in the actual document retrieval (ie, page through the ScoreDoc array and get the Documents from the searcher using searcher.doc(ScoreDoc.doc)). It appears that the PayloadTermQuery approach is consistently faster, but not significantly so.

Query Term #-results TermQuery (ms) PayloadTermQuery (ms)
2800541 46 12.5 9,25
2790981 39 10.0 6.75
5348177 50 9.25 9.0
2793084 50 6.5 6.0
2800232 50 5.75 4.5