Saturday, May 30, 2009

Using Neo4J to load and query OWL ontologies

I've written previously about modeling, storing and navigating through ontologies (you can see them here, here, here and here). These were all based on ideas on how I could improve upon ontology systems I had previously encountered at work. As I have no formal background in Semantic Web Programming, most of these implementations were based on tools that I was already familiar with or wanted to get familiar with.

I recently bought a book on Semantic Web Programming (see my review on Amazon here), and I must say it opened up a whole new world for me. Among other things, the book has a very good coverage of Jena, a Semantic Web Framework for Java, something I had been meaning to take a look at for a while.

Somewhat unrelated, I also came across Neo4J, a graph database, and it seemed to be a good fit as a data store for an ontology. Prior to this, the ontologies I have seen were stored in a relational database, which was then converted into an in-memory graph, then serialized out to disk using Java serialization for use by applications. This means that the serialized version is a point-in-time snapshot, not a true copy of the ontology. Depending on how frequently the ontology is updated, this may not be a big deal. But if the ontology is stored in a graph database to begin with, then the backend could continue to update the database, and the application would always see the current ontology. Makes things much cleaner in my opinion.

So I decided to take the OWL file for a sample Wine and Food ontology, and parse it using Jena, then load it into the Neo graph database, and run a few queries against it, to familiarize myself with the Jena and Neo APIs. This post is a result of that effort.

Load Phase

The code for the data loader is shown below. It uses Jena to parse the wine.rdf and food.rdf files and write it out into a Neo graph database. The Jena parser parses the files into a Collection of Statement objects, and exposes an Iterator to get at them. Each statement is a (subject, predicate, object) Triple, which correspond to the start node, relationship and end node in a graph database.

In keeping with the best practices described in the Neo4J Guide (PDF), I also added a pseudo-node representing the start node (also known as reference node) of the graph, and a pseudo-node for each OWL file. The reference node points to the OWL file pseudo nodes, and each of the file nodes point to the nodes from the statements extracted from that file.

To query the database given a node name, I used Neo's LuceneIndexService to create a lookup table, which points to the Node. In addition, I wanted to assign weights to each relationship, so I added in a property.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
// Source: src/main/java/net/sf/jtmt/ontology/graph/loaders/Owl2NeoLoader.java
package net.sf.jtmt.ontology.graph.loaders;

import net.sf.jtmt.ontology.graph.OntologyRelationshipType;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.neo4j.api.core.Direction;
import org.neo4j.api.core.EmbeddedNeo;
import org.neo4j.api.core.NeoService;
import org.neo4j.api.core.NotFoundException;
import org.neo4j.api.core.Relationship;
import org.neo4j.api.core.Transaction;
import org.neo4j.util.index.IndexService;
import org.neo4j.util.index.LuceneIndexService;

import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Node_URI;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;

/**
 * Parses an OWL RDF file and populates a graph database directly.
 */
public class Owl2NeoLoader {

  private static final String FIELD_ENTITY_NAME = "name";
  private static final String FIELD_ENTITY_TYPE = "type";
  private static final String FIELD_RELATIONSHIP_NAME = "name";
  private static final String FIELD_RELATIONSHIP_WEIGHT = "weight";
  
  private final Log log = LogFactory.getLog(getClass());
  
  private String filePath;
  private String dbPath;
  private String ontologyName;
  private String refNodeName;
  
  public void setFilePath(String filePath) {
    this.filePath = filePath;
  }
  
  public void setDbPath(String dbPath) {
    this.dbPath = dbPath;
  }
  
  public void setOntologyName(String ontologyName) {
    this.ontologyName = ontologyName;
  }
  
  public void setRefNodeName(String refNodeName) {
    this.refNodeName = refNodeName;
  }
  
  public void load() throws Exception {
    NeoService neoService = null;
    IndexService indexService = null;
    try {
      // set up an embedded instance of neo database
      neoService = new EmbeddedNeo(dbPath);
      // set up index service for looking up node by name
      indexService = new LuceneIndexService(neoService);
      // set up top-level pseudo nodes for navigation
      org.neo4j.api.core.Node refNode = getReferenceNode(neoService);
      org.neo4j.api.core.Node fileNode = getFileNode(neoService, refNode);
      // parse the owl rdf file
      Model model = ModelFactory.createDefaultModel();
      model.read("file://" + filePath);
      // iterate through all triples in the file, and set up corresponding
      // nodes in the neo database.
      StmtIterator it = model.listStatements();
      while (it.hasNext()) {
        Statement st = it.next();
        Triple triple = st.asTriple();
        insertIntoDb(neoService, indexService, fileNode, triple);
      }
    } finally {
      if (indexService != null) {
        indexService.shutdown();
      }
      if (neoService != null) {
        neoService.shutdown();
      }
    }
  }

  /**
   * Get the reference node if already available, otherwise create it.
   * @param neoService the reference to the Neo service.
   * @return a Neo4j Node object reference to the reference node.
   * @throws Exception if thrown.
   */
  private org.neo4j.api.core.Node getReferenceNode(NeoService neoService) 
      throws Exception { 
    org.neo4j.api.core.Node refNode = null;
    Transaction tx = neoService.beginTx(); 
    try {
      refNode = neoService.getReferenceNode();
      if (! refNode.hasProperty(FIELD_ENTITY_NAME)) {
        refNode.setProperty(FIELD_ENTITY_NAME, refNodeName);
        refNode.setProperty(FIELD_ENTITY_TYPE, "Thing");
      }
      tx.success();
    } catch (NotFoundException e) {
      tx.failure();
      throw e;
    } finally {
      tx.finish();
    }
    return refNode;
  }

  /**
   * Creates a single node for the file. This method is called once
   * per file, and the node should not exist in the Neo4j database.
   * So there is no need to check for existence of the node. Once
   * the node is created, it is connected to the reference node.
   * @param neoService the reference to the Neo service.
   * @param refNode the reference to the reference node.
   * @return the "file" node representing the entry-point into the
   * entities described by the current OWL file.
   * @throws Exception if thrown.
   */
  private org.neo4j.api.core.Node getFileNode(NeoService neoService,
      org.neo4j.api.core.Node refNode) throws Exception {
    org.neo4j.api.core.Node fileNode = null;
    Transaction tx = neoService.beginTx();
    try {
      fileNode = neoService.createNode();
      fileNode.setProperty(FIELD_ENTITY_NAME, ontologyName);
      fileNode.setProperty(FIELD_ENTITY_TYPE, "Class");
      Relationship rel = refNode.createRelationshipTo(
        fileNode, OntologyRelationshipType.CATEGORIZED_AS);
      logTriple(refNode, 
        OntologyRelationshipType.CATEGORIZED_AS, fileNode);
      rel.setProperty(
        FIELD_RELATIONSHIP_NAME, 
        OntologyRelationshipType.CATEGORIZED_AS.name());
      rel.setProperty(FIELD_RELATIONSHIP_WEIGHT, 0.0F);
      tx.success();
    } catch (Exception e) {
      tx.failure();
      throw e;
    } finally {
      tx.finish();
    }
    return fileNode;
  }

  /**
   * Inserts selected entities and relationships from Triples extracted
   * from the OWL document by the Jena parser. Only entities which have
   * a non-blank node for the subject and object are used. Further, only
   * relationship types listed in OntologyRelationshipTypes enum are 
   * considered. In addition, if the enum specifies that certain 
   * relationship types have an inverse, the inverse relation is also
   * created here.
   * @param neoService a reference to the Neo service.
   * @param indexService a reference to the Index service (for looking
   * up Nodes by name).
   * @param fileNode a reference to the Node that is an entry point into
   * this ontology. This node will connect to both the subject and object 
   * nodes of the selected triples via a CONTAINS relationship. 
   * @param triple a reference to the Triple extracted by the Jena parser.
   * @throws Exception if thrown.
   */
  private void insertIntoDb(NeoService neoService, 
      IndexService indexService,
      org.neo4j.api.core.Node fileNode, 
      Triple triple) throws Exception {
    Node subject = triple.getSubject();
    Node predicate = triple.getPredicate();
    Node object = triple.getObject();
    if ((subject instanceof Node_URI) &&
        (object instanceof Node_URI)) {
      // get or create the subject and object nodes
      org.neo4j.api.core.Node subjectNode = 
        getEntityNode(neoService, indexService, subject);
      org.neo4j.api.core.Node objectNode =
        getEntityNode(neoService, indexService, object);
      if (subjectNode == null || objectNode == null) {
        return;
      }
      Transaction tx = neoService.beginTx();
      try {
        // hook up both nodes to the fileNode
        if (! isConnected(neoService, fileNode, 
            OntologyRelationshipType.CONTAINS, 
            Direction.OUTGOING, subjectNode)) {
          logTriple(fileNode, 
            OntologyRelationshipType.CONTAINS, subjectNode);
          Relationship rel = fileNode.createRelationshipTo(
            subjectNode, OntologyRelationshipType.CONTAINS);
          rel.setProperty(FIELD_RELATIONSHIP_NAME, 
            OntologyRelationshipType.CONTAINS.name());
          rel.setProperty(FIELD_RELATIONSHIP_WEIGHT, 0.0F);
        }
        if (! isConnected(neoService, fileNode, 
            OntologyRelationshipType.CONTAINS, 
            Direction.OUTGOING, objectNode)) {
          logTriple(fileNode, 
            OntologyRelationshipType.CONTAINS, objectNode);
          Relationship rel = fileNode.createRelationshipTo(
            objectNode, OntologyRelationshipType.CONTAINS);
          rel.setProperty(
            FIELD_RELATIONSHIP_NAME, 
            OntologyRelationshipType.CONTAINS.name());
          rel.setProperty(FIELD_RELATIONSHIP_WEIGHT, 0.0F);
        }
        // hook up subject and object via predicate
        OntologyRelationshipType type = 
          OntologyRelationshipType.fromName(predicate.getLocalName());
        if (type != null) {
          logTriple(subjectNode, type, objectNode);
          Relationship rel = subjectNode.createRelationshipTo(
              objectNode, type);
          rel.setProperty(FIELD_RELATIONSHIP_NAME, type.name());
          rel.setProperty(FIELD_RELATIONSHIP_WEIGHT, 1.0F);
        }
        // create reverse relationship
        OntologyRelationshipType inverseType = 
          OntologyRelationshipType.inverseOf(predicate.getLocalName());
        if (inverseType != null) {
          logTriple(objectNode, inverseType, subjectNode);
          Relationship inverseRel = objectNode.createRelationshipTo(
            subjectNode, inverseType);
          inverseRel.setProperty(
            FIELD_RELATIONSHIP_NAME, inverseType.name());
          inverseRel.setProperty(FIELD_RELATIONSHIP_WEIGHT, 1.0F);
        }
        tx.success();
      } catch (Exception e) {
        tx.failure();
        throw e;
      } finally {
        tx.finish();
      }
    } else {
      return;
    }
  }

  /**
   * Loops through the relationships and returns true if the source
   * and target nodes are connected using the specified relationship
   * type and direction.
   * @param neoService a reference to the NeoService.
   * @param sourceNode the source Node object.
   * @param relationshipType the type of relationship.
   * @param direction the direction of the relationship.
   * @param targetNode the target Node object.
   * @return true or false.
   * @throws Exception if thrown.
   */
  private boolean isConnected(NeoService neoService, 
      org.neo4j.api.core.Node sourceNode,
      OntologyRelationshipType relationshipType, Direction direction,
      org.neo4j.api.core.Node targetNode) throws Exception {
    boolean isConnected = false;
    Transaction tx = neoService.beginTx();
    try {
      for (Relationship rel : sourceNode.getRelationships(
          relationshipType, direction)) {
        org.neo4j.api.core.Node endNode = rel.getEndNode();
        if (endNode.getProperty(FIELD_ENTITY_NAME).equals(
            targetNode.getProperty(FIELD_ENTITY_NAME))) {
          isConnected = true;
          break;
        }
      }
      tx.success();
    } catch (Exception e) {
      tx.failure();
      throw e;
    } finally {
      tx.finish();
    }
    return isConnected;
  }

  private org.neo4j.api.core.Node getEntityNode(NeoService neoService,
      IndexService indexService, Node entity) throws Exception {
    String uri = ((Node_URI) entity).getURI();
    if (uri.indexOf('#') == -1) {
      return null;
    }
    String[] parts = StringUtils.split(uri, "#");
    String type = parts[0].substring(0, parts[0].lastIndexOf('/'));
    Transaction tx = neoService.beginTx();
    try {
      org.neo4j.api.core.Node entityNode = 
        indexService.getSingleNode(FIELD_ENTITY_NAME, parts[1]);
      if (entityNode == null) {
        entityNode = neoService.createNode();
        entityNode.setProperty(FIELD_ENTITY_NAME, parts[1]);
        entityNode.setProperty(FIELD_ENTITY_TYPE, type);
        indexService.index(entityNode, FIELD_ENTITY_NAME, parts[1]);
      }
      tx.success();
      return entityNode;
    } catch (Exception e) {
      tx.failure();
      throw e;
    } finally {
      tx.finish();
    }
  }
  
  /**
   * Convenience method to log the triple when it is inserted into the
   * database.
   * @param sourceNode the subject of the triple.
   * @param ontologyRelationshipType the predicate of the triple.
   * @param targetNode the object of the triple.
   */
  private void logTriple(org.neo4j.api.core.Node sourceNode, 
      OntologyRelationshipType ontologyRelationshipType, 
      org.neo4j.api.core.Node targetNode) {
    log.info("(" + sourceNode.getProperty(FIELD_ENTITY_NAME) +
      "," + ontologyRelationshipType.name() + 
      "," + targetNode.getProperty(FIELD_ENTITY_NAME) + ")");
  }
}

The relationship types are listed in the OntologyRelationshipType enum below. The types were found manually by first parsing the Statement objects and finding unique relationships. So it is likely that this enum will need to be expanded if other OWL files need to be parsed.

In addition, I also added in inverse relationships which are not available in the OWL file. Here is the code for OntologyRelationshipType.java.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Source: src/main/java/net/sf/jtmt/ontology/graph/OntologyRelationshipType.java
package net.sf.jtmt.ontology.graph;

import org.neo4j.api.core.RelationshipType;

/**
 * Relationships exposed by the taxonomy.
 */
public enum OntologyRelationshipType implements RelationshipType {
  CATEGORIZED_AS(null, null),  // pseudo-rel
  CONTAINS(null, null),        // pseudo-rel
  ADJACENT_REGION("adjacentRegion", "adjacentRegion"),
  HAS_VINTAGE_YEAR("hasVintageYear", "isVintageYearOf"),
  LOCATED_IN("locatedIn", "regionContains"),
  MADE_FROM_GRAPE("madeFromGrape", "mainIngredient"),
  HAS_FLAVOR("hasFlavor", "isFlavorOf"),
  HAS_COLOR("hasColor", "isColorOf"),
  HAS_SUGAR("hasSugar", "isSugarContentOf"),
  HAS_BODY("hasBody", "isBodyOf"),
  HAS_MAKER("hasMaker", "madeBy"),
  IS_INSTANCE_OF("type", "hasInstance"),
  SUBCLASS_OF("subClassOf", "superClassOf"),
  DISJOINT_WITH("disjointWith", "disjointWith"),
  DIFFERENT_FROM("differentFrom", "differentFrom"),
  DOMAIN("domain", null),
  IS_VINTAGE_YEAR_OF("isVintageYearOf", "hasVintageYear"),
  REGION_CONTAINS("regionContains", "locatedIn"),
  MAIN_INGREDIENT("mainIngredient", "madeFromGrape"),
  IS_FLAVOR_OF("isFlavorOf", "hasFlavor"),
  IS_COLOR_OF("isColorOf", "hasColor"),
  IS_SUGAR_CONTENT_OF("isSugarContentOf", "hasSugar"),
  IS_BODY_OF("isBodyOf", "hasBody"),
  MADE_BY("madeBy", "hasMaker"),
  HAS_INSTANCE("hasInstance", "type"),
  SUPERCLASS_OF("superClassOf", "subClassOf");

  private String name;
  private String inverseName;
  
  OntologyRelationshipType(String name, String inverseName) {
    this.name = name;
    this.inverseName = inverseName;
  }
   
  public static OntologyRelationshipType fromName(String name) {
    for (OntologyRelationshipType type : values()) {
      if (name.equals(type.name)) {
        return type;
      }
    }
    return null;
  }
  
  public static OntologyRelationshipType inverseOf(String name) {
    OntologyRelationshipType rel = fromName(name);
    if (rel != null && rel.inverseName != null) {
      return fromName(rel.inverseName);
    } else {
      return null;
    }
  }
}

The loader operates on a single OWL file at a time. To run it, I use the following JUnit test class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Source: src/test/java/net/sf/jtmt/ontology/graph/Owl2NeoLoaderTest.java
package net.sf.jtmt.ontology.graph;

import net.sf.jtmt.ontology.graph.loaders.Owl2NeoLoader;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

/**
 * Test case for Owl2NeoLoader.
 */
public class Owl2NeoLoaderTest {

  private static final String ROOT_NAME = "ConsumableThing";
  
  private final Log log = LogFactory.getLog(getClass());
  
  private static final String[][] SUB_ONTOLOGIES = new String[][] {
    new String[] {"wine.rdf", "Wine"},
    new String[] {"food.rdf", "EdibleThing"}
  };
  
  @Test
  public void testLoading() throws Exception {
    for (String[] subOntology : SUB_ONTOLOGIES) {
      log.info("Now processing " + subOntology[0]);
      Owl2NeoLoader loader = new Owl2NeoLoader();
      loader.setRefNodeName(ROOT_NAME);
      loader.setFilePath(FilenameUtils.concat(
        "/home/sujit/src/jtmt/src/main/resources", subOntology[0]));
      loader.setDbPath("/tmp/neodb");
      loader.setOntologyName(subOntology[1]);
      loader.load();
    }
  }
}

The loader also prints out the triples as it writes them. A partial log (minus the date/time/source data) is shown below.

1
2
3
4
5
6
7
8
...
(CorbansPrivateBinSauvignonBlanc,HAS_SUGAR,Dry)
(Dry,IS_SUGAR_CONTENT_OF,CorbansPrivateBinSauvignonBlanc)
(Wine,CONTAINS,Corbans)
(CorbansPrivateBinSauvignonBlanc,HAS_MAKER,Corbans)
(Corbans,MADE_BY,CorbansPrivateBinSauvignonBlanc)
(Wine,CONTAINS,NewZealandRegion)
...

Query Phase

To test out the loading, I used the same queries that I did previously, using JGraphT against a Prevayler backed in-memory graph. I decided to build a query class which encapsulates the Neo4J query code. Here is the code for the query component.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Source: src/main/java/net/sf/jtmt/ontology/graph/NeoOntologyNavigator.java
package net.sf.jtmt.ontology.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.commons.collections15.MultiMap;
import org.apache.commons.collections15.multimap.MultiHashMap;
import org.neo4j.api.core.Direction;
import org.neo4j.api.core.EmbeddedNeo;
import org.neo4j.api.core.NeoService;
import org.neo4j.api.core.Node;
import org.neo4j.api.core.Relationship;
import org.neo4j.api.core.ReturnableEvaluator;
import org.neo4j.api.core.StopEvaluator;
import org.neo4j.api.core.Transaction;
import org.neo4j.api.core.Traverser;
import org.neo4j.api.core.Traverser.Order;
import org.neo4j.util.index.IndexService;
import org.neo4j.util.index.LuceneIndexService;

/**
 * Provides methods to locate nodes and find neighbors in the Neo
 * graph database.
 */
public class NeoOntologyNavigator {

  public static final String FIELD_ENTITY_NAME = "name";
  public static final String FIELD_RELATIONSHIP_NAME = "name";
  public static final String FIELD_RELATIONSHIP_WEIGHT = "weight";

  private class WeightedNode {
    public Node node;
    public Float weight;
    public WeightedNode(Node node, Float weight) {
      this.node = node;
      this.weight = weight;
    }
  };
  
  private String neoDbPath;
  
  private NeoService neoService;
  private IndexService indexService;
  
  /**
   * Ctor for NeoOntologyNavigator
   * @param dbPath the path to the neo database.
   */
  public NeoOntologyNavigator(String dbPath) {
    super();
    this.neoDbPath = dbPath;
  }
  
  /**
   * The init() method should be called by client after instantiation.
   */
  public void init() {
    this.neoService = new EmbeddedNeo(neoDbPath);
    this.indexService = new LuceneIndexService(neoService);
  }
  
  /**
   * The destroy() method should be called by client on shutdown.
   */
  public void destroy() {
    indexService.shutdown();
    neoService.shutdown();
  }
  
  /**
   * Gets the reference to the named Node. Returns null if the node
   * is not found in the database.
   * @param nodeName the name of the node to lookup.
   * @return the reference to the Node, or null if not found.
   * @throws Exception if thrown.
   */
  public Node getByName(String nodeName) throws Exception {
    Transaction tx = neoService.beginTx();
    try {
      Node node = indexService.getSingleNode(FIELD_ENTITY_NAME, nodeName);
      tx.success();
      return node;
    } catch (Exception e) {
      tx.failure();
      throw(e);
    } finally {
      tx.finish();
    }
  }

  /**
   * Return a Map of relationship names to a List of nodes connected
   * by that relationship. The keys are sorted by name, and the list
   * of node values are sorted by the incoming relation weights.
   * @param node the root Node.
   * @return a Map of String to Node List of neighbors.
   */
  public Map<String,List<Node>> getAllNeighbors(Node node)
      throws Exception {
    MultiMap<String,WeightedNode> neighbors = 
      new MultiHashMap<String,WeightedNode>();
    Transaction tx = neoService.beginTx();
    try {
      String nodeName = (String) node.getProperty(FIELD_ENTITY_NAME);
      for (Relationship relationship : node.getRelationships()) {
        String relName = 
          (String) relationship.getProperty(FIELD_RELATIONSHIP_NAME);
        Float relWeight = 
          (Float) relationship.getProperty(FIELD_RELATIONSHIP_WEIGHT);
        if (relWeight == 0.0F) {
          continue;
        }
        Node neighborNode = relationship.getEndNode();
        // if self-loop, ignore
        String neighborNodeName = 
          (String) neighborNode.getProperty(FIELD_ENTITY_NAME);
        if (nodeName.equals(neighborNodeName)) {
          continue;
        }
        neighbors.put(relName, new WeightedNode(neighborNode, relWeight));
      }
      tx.success();
    } catch (Exception e) {
      tx.failure();
      throw e;
    } finally {
      tx.finish();
    }
    // sort each collection of weighted nodes
    for (String relName : neighbors.keySet()) {
      List<WeightedNode> nodes = 
        (List<WeightedNode>) neighbors.get(relName);
      Collections.sort(nodes, new Comparator<WeightedNode>() {
        public int compare(WeightedNode w1, WeightedNode w2) {
          return w2.weight.compareTo(w1.weight);
        }
      });
    }
    // finally sort the keys and upcast WeightedNodes to Nodes
    SortedMap<String,List<Node>> neighborMap = 
      new TreeMap<String,List<Node>>();
    for (String relName : neighbors.keySet()) {
      Collection<WeightedNode> weightedNodes = neighbors.get(relName);
      List<Node> nodes = new ArrayList<Node>();
      for (WeightedNode weightedNode : weightedNodes) {
        nodes.add(weightedNode.node);
      }
      neighborMap.put(relName, nodes);
    }
    return neighborMap;
  }
  
  /**
   * Returns a List of neighbor nodes that is reachable from the specified
   * Node. No ordering is done (since the Traverser framework does not seem
   * to allow this type of traversal, and we want to use the Traverser here).
   * @param node reference to the base node.
   * @param type the relationship type.
   * @return a List of neighbor nodes.
   */
  public List<Node> getNeighborsRelatedBy(Node node,
      OntologyRelationshipType type) throws Exception {
    List<Node> neighbors = new ArrayList<Node>();
    Transaction tx = neoService.beginTx();
    try {
      Traverser traverser = node.traverse(
        Order.BREADTH_FIRST, 
        StopEvaluator.DEPTH_ONE, 
        ReturnableEvaluator.ALL_BUT_START_NODE, 
        type, 
        Direction.OUTGOING);
      for (Iterator<Node> it = traverser.iterator(); it.hasNext();) {
        Node neighbor = it.next();
        neighbors.add(neighbor);
      }
      tx.success();
    } catch (Exception e) {
      tx.failure();
      throw(e);
    } finally {
      tx.success();
    }
    return neighbors;
  }
}

The query client is represented by the JUnit class shown below. Notice that the query client operates at the abstraction of an application, ie there is no Neo4J code in "client code".

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Source: src/test/java/net/sf/jtmt/ontology/graph/NeoOntologyNavigatorTest.java
package net.sf.jtmt.ontology.graph;

import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.neo4j.api.core.Node;

/**
 * Test case for NeoDb Navigator.
 */
public class NeoOntologyNavigatorTest {
  
  private final Log log = LogFactory.getLog(getClass());
  private static final String NEODB_PATH = "/tmp/neodb";
  private static NeoOntologyNavigator navigator;
  
  @BeforeClass
  public static void setupBeforeClass() throws Exception {
    navigator = new NeoOntologyNavigator(NEODB_PATH);
    navigator.init();
  }
  
  @AfterClass
  public static void teardownAfterClass() throws Exception {
    navigator.destroy();
  }
  
  @Test
  public void testWhereIsLoireRegion() throws Exception {
    log.info("query> where is LoireRegion?");
    Node loireRegionNode = navigator.getByName("LoireRegion");
    if (loireRegionNode != null) {
      List<Node> locations = navigator.getNeighborsRelatedBy(
        loireRegionNode, OntologyRelationshipType.LOCATED_IN);
      for (Node location : locations) {
        log.info(
          location.getProperty(NeoOntologyNavigator.FIELD_ENTITY_NAME));
      }
    }
  }
  
  @Test
  public void testWhatRegionsAreInUsRegion() throws Exception {
    log.info("query> what regions are in USRegion?");
    Node usRegion = navigator.getByName("USRegion");
    if (usRegion != null) {
      List<Node> locations = navigator.getNeighborsRelatedBy(
        usRegion, OntologyRelationshipType.REGION_CONTAINS);
      for (Node location : locations) {
        log.info(
          location.getProperty(NeoOntologyNavigator.FIELD_ENTITY_NAME));
      }
    }
  }
  
  @Test
  public void testWhatAreSweetWines() throws Exception {
    log.info("query> what are Sweet wines?");
    Node sweetNode = navigator.getByName("Sweet");
    if (sweetNode != null) {
      List<Node> sweetWines = navigator.getNeighborsRelatedBy(
        sweetNode, OntologyRelationshipType.IS_SUGAR_CONTENT_OF);
      for (Node sweetWine : sweetWines) {
        log.info(
          sweetWine.getProperty(NeoOntologyNavigator.FIELD_ENTITY_NAME));
      }
    }
  }

  @Test
  public void testShowNeighborsForAReislingWine() throws Exception {
    log.info("query> show neighbors for SchlossVolradTrochenbierenausleseRiesling");
    Node rieslingNode = 
      navigator.getByName("SchlossVolradTrochenbierenausleseRiesling");
    Map<String,List<Node>> neighbors = 
      navigator.getAllNeighbors(rieslingNode);
    for (String relType : neighbors.keySet()) {
      log.info("--- " + relType + " ---");
      List<Node> relatedNodes = neighbors.get(relType);
      for (Node relatedNode : relatedNodes) {
        log.info(
          relatedNode.getProperty(NeoOntologyNavigator.FIELD_ENTITY_NAME));
      }
    }
  }
}

The output of the queries is shown below. As you can see, first three are similar to the MySQL/Prevayler/JGraphT version described in my earlier posts. The last one is a dump of a named node, may be useful if we want to build a browsing tool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
query> where is LoireRegion?
FrenchRegion

query> what regions are in USRegion?
TexasRegion
CaliforniaRegion

query> what are Sweet wines?
WhitehallLanePrimavera
SchlossVolradTrochenbierenausleseRiesling
SchlossRothermelTrochenbierenausleseRiesling

query> show neighbors for SchlossVolradTrochenbierenausleseRiesling?
--- HAS_BODY ---
Full
--- HAS_FLAVOR ---
Moderate
--- HAS_MAKER ---
SchlossVolrad
--- HAS_SUGAR ---
Sweet
--- IS_INSTANCE_OF ---
SweetRiesling
--- LOCATED_IN ---
GermanyRegion

I have barely scratched the surface of the Jena API with this, but I think I have exercised quite a bit of the Neo4J API, and I was quite impressed with the latter. One thing I would have liked to have is support for weighted relationships in the Traverser mechanism, so I could sort the relationships by weight, in case of multiple relationships.

My dataset is too small for me to form any opinion about performance and stability, but now that I am familiar with the API, I plan to use Neo4J to hold a (much) larger dataset to see how it compares against our current architecture of RDBMS and serialized graph.

Saturday, May 16, 2009

Parallel Application with Blitz JavaSpaces

Introduction

Late last year, I started looking at the Actor Framework (you can read about my experiences here, here, here and here), to figure out how to write clean multi-threaded applications without too much effort. During that time, I also read about JavaSpaces, a Java™ technology based on the LINDA programming model, which allows a developer to distribute tasks across multiple JVMs in a cluster.

I've been meaning to look at JavaSpaces for a while now, but I've been procrastinating because of its unfamiliarity. But recently I came across an embarassingly parallel problem for which it seemed to be a perfect fit (based on my limited knowledge), so I decided to try it out with my toy example from the Actor postings.

My Toy Example on JavaSpaces

To quickly recap, my toy example consists of a pipeline of three tasks, the first of which downloads a page from a remote site given its URL, the second indexes the page into an in-memory data structure, and the third writes the index data structure out to disk. Of these, the download task is (expected to be) network intensive and the index task is (expected to be) CPU and memory intensive, and both take much longer to finish than the write task. So the idea is to parallelize the first two tasks across multiple downloaders and indexers, and then gather the results back into a single writer. It would look something like this (using JavaSpaces).

Which Implementation?

The 800-pound gorilla in this space (no pun intended) is GigaSpaces. Their product is the XAP (eXtreme Application Platform), and it provides a simplified API using Spring and Annotations. It also comes with a set of custom Maven2 Mojos and archetypes to get you developing quickly, and enormous amounts of publicly available documentation. However, their (free) community version is really a crippled (single-node) eval good for about 45 days. Nothing wrong with that, of course, its their product, they can do what they want with it.

The other implementation I looked at was Blitz, an open source project (mostly) written and maintained by Dan Creswell. It has a few examples in the download, which worked fine for me, since the JavaSpaces API is not that hard. I was also quite impressed with the support - I had a few problems during installation, and after a few hours trying to figure it out myself, I asked on the Blitz users newsgroup, and Dan emailed me back with the solution (I had mistyped something) within the hour.

There is also Outrigger, Sun's reference implementation for JavaSpaces that comes bundled with the Jini starter kit, but I didn't try it, so I don't know enough to say anything about it.

I ended up using Blitz.

Blitz Installation

One of the things that caused me to put this stuff off was the multi-step installation and startup processes for the JavaSpaces server components, described in various articles I read earlier. However, Blitz 2.1 has a all-in-one SWT-based installer which will install Blitz, its Jini dependencies, and also configure the installation based on your JDK version.

After the installation, I tried running the helloworld example (instructions available on the site here) to test the installation, and turned out that I needed to enable Multicast on my loopback interface for the JavaSpaces server to be found. After that, the example code ran fine, and I was ready to build my application.

Code

The code I show here is based heavily on the example code in the Blitz distribution, some postings on the Blitz mailing list, and some other resources (listed below under References). However, it has some extra features (that are not in the examples), because I felt they were essential. They are:

  • Supports multiple stages - the Hello World example is a single stage application, ie one feeder and one processing unit. In my example, there are multiple phases - does not introduce huge complexity, but is probably closer to real-world scenarios.
  • Supports Transactions - in production systems, exceptions are common, so must be dealt with - JavaSpaces provides transactions that applications can use to put back entries into the space if it got an exception when dealing with it.
  • Supports High/Low Watermarking - generally, the whole point of parallelizing a process is that the producer can submit jobs faster than the consumer can handle it, so we want to increase the number of consumers. Depending on the size of your Entry object and the size of the VM of your JavaSpace, you can run out of memory on it if the producer is faster than the consumer. I use the technique of high/low watermarking as described here to deal with that.

Additions to the POM

JavaSpaces servers such as Blitz implement the JINI API, so the only two JAR files that need to be added are the jsk-lib.jar and jsk-platform.jar files. I installed them manually into my local Maven repository from the jini2_1/lib directory of the blitz distribution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<project ...>
  ...
  <dependencies>
    ...
    <dependency>
      <groupId>com.sun.jini</groupId>
      <artifactId>jsk-lib</artifactId>
      <version>2.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.sun.jini</groupId>
      <artifactId>jsk-platform</artifactId>
      <version>2.1</version>
      <scope>compile</scope>
    </dependency>
    ...
  </dependencies>
  ...
</project>

AbstractProcessor.java

In keeping with the advice on JavaSpaces patterns here, most of the JavaSpaces heavy lifting is done using Lookup.java, which I copied from the examples/helloworld directory of the Blitz distribution. I am not going to show the code for Lookup.java, either download the distribution or find it here.

In addition, I moved the common functionality of the processing units, such as getting a JavaSpace and Transaction objects up into the abstract class shown below. It has a single abstract process() method which subclasses must implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/AbstractProcessor.java
package com.mycompany.myapp.concurrent.blitz;

import java.util.logging.Logger;

import net.jini.core.transaction.Transaction;
import net.jini.core.transaction.TransactionFactory;
import net.jini.core.transaction.server.TransactionManager;
import net.jini.space.JavaSpace;

/**
 * Abstract superclass for the processing unit. The class contains
 * common application level and JavaSpace related functionality,
 * so the subclasses can focus on the business logic. 
 */
public abstract class AbstractProcessor {

  protected Logger logger = 
    Logger.getLogger(this.getClass().getName());
  
  protected final static long DEFAULT_LEASE_TIMEOUT = 120000L; // 2 mins

  /**
   * Simple lookup, returns the default JavaSpace found. More 
   * complex lookups across multiple machines may be possible
   * using LookupLocators. This method could be modified in the
   * future to return an array of JavaSpaces.
   * @return the default JavaSpace.
   */
  protected JavaSpace getSpace() throws Exception {
    Lookup lookup = new Lookup(JavaSpace.class);
    JavaSpace space = (JavaSpace) lookup.getService();
    return space;
  }

  /**
   * Returns a Transaction object to the caller by looking it
   * up from the JavaSpaces server.
   * @return a Transaction object.
   * @throws Exception if one is thrown.
   */
  protected Transaction createTransaction() throws Exception {
    Lookup lookup = new Lookup(TransactionManager.class);
    TransactionManager txManager = 
      (TransactionManager) lookup.getService();
    Transaction.Created trc = TransactionFactory.create(
      txManager, DEFAULT_LEASE_TIMEOUT);
    return trc.transaction;
  }
  
  /**
   * Convenience method to build a Document template based on the
   * desired status.
   * @param status the DocumentStatus of the template.
   * @return the Document template for matching.
   */
  public Document getTemplate(Document.Status status) {
    Document doc = new Document();
    if (status != null) {
      doc.status = status;
    }
    return doc;
  }

  /**
   * This is the method that subclasses will override, and will contain
   * the business logic that needs to be applied to the entry.
   * @throws Exception if one is thrown.
   */
  public abstract void process() throws Exception;
}

Document.java

A JavaSpace contains implementations of the JINI Entry object. In our case, this is a simple Document bean as shown below. Note that member variables must be public - this is a JavaSpaces requirement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Document.java
package com.mycompany.myapp.concurrent.blitz;

import net.jini.core.entry.Entry;

/**
 * Represents the Document that will be transformed as it goes
 * through different processors.
 */
public class Document implements Entry {

  private static final long serialVersionUID = 54056132871976348L;

  public static enum Status {New, Downloaded, Indexed, Written};
  
  public Status status;
  public String url;
  public String contents;
  public String indexData;
  
  public Document() {
    super();
  }
  
  public String toString() {
    String statusString = "unknown";
    switch (status) {
      case New: { statusString = "new"; break; }
      case Downloaded: { statusString = "downloaded"; break; }
      case Indexed: { statusString = "indexed"; break; }
      case Written: { statusString = "written"; break; }
      default: {}
    }
    return url + " (" + statusString + ")";
  }
}

Note that my Entry object is just a container of data. My application follows the "dumb-data smart-worker" approach. Another approach is the "smart-data dumb-worker" (aka the barn-raising model), where the Entry knows how to compute itself. In that case, the processors are generic, and just call the execute method on the Entry.

Master.java

Each stage of the multi-stage process is represented by a subclass of AbstractProcessor. My initial implementation was four separate classes (one each for Feeder, Downloader, Indexer and Writer) doing the necessary take/write operations on the common JavaSpace. Later, I coalesced the Feeder and Writer into inner classes inside a single Master. This allowed the two classes to communicate with each other for the high/low watermarking feature. In addition, it made it possible to manage them from a single terminal. The code for the Master is shown below. There is a lot of try/catch loops in here to ensure some degree of fault tolerance.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Master.java
package com.mycompany.myapp.concurrent.blitz;

import java.io.FileWriter;
import java.io.PrintWriter;
import java.rmi.RMISecurityManager;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.jini.core.lease.Lease;
import net.jini.core.transaction.Transaction;
import net.jini.space.JavaSpace;

/**
 * Master process. There should be only one of these in the system.
 * This process contains two threads - a feeder thread and a writer
 * thread. The Feeder thread feeds work into the system, optionally 
 * (not implemented yet) scattering the work into multiple JavaSpaces 
 * across multiple machines. The writer thread will write the results
 * of the processing to local disk, optionally (not implemented yet) 
 * gathering all the results from multiple JavaSpaces across multiple 
 * machines. The two threads communicate with each other in order to
 * avoid overloading the JavaSpace with too many jobs.
 */
public class Master {

  private final Logger logger = Logger.getLogger(this.getClass().getName());
  
  private final static long MAX_ENTRIES_TO_FEED = 25;
  
  private final static long ENTRIES_IN_SPACE_HWM = 5; // high water mark
  private final static long ENTRIES_IN_SPACE_LWM = 2; // low water mark
  private final static long RETRY_INTERVAL_MILLIS = 5000L; // 5 seconds
  
  private boolean shouldTerminate = false;
  private long numEntriesSent = 0L;
  private long numEntriesReceived = 0L;
  
  public void scatterGather() throws Exception {
    Thread feederThread = new Thread(new Runnable() {
      public void run() {
        try {
          Feeder feeder = new Feeder();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            if (numEntriesSent > MAX_ENTRIES_TO_FEED) {
              break;
            }
            if (isSpaceAboveHighWaterMark()) {
              for (;;) {
                logger.info("Space full, pausing");
                pause(RETRY_INTERVAL_MILLIS);
                if (isSpaceBelowLowWaterMark()) {
                  break;
                }
              }
            }
            try { 
              feeder.process(); 
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Feeder process error, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
            numEntriesSent++;
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Thread writerThread = new Thread(new Runnable() {
      public void run() {
        try {
          WriteProcessor writer = new WriteProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              writer.process();
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Error processing write, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
            numEntriesReceived++;
          }
          writer.destroy();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() {
        shouldTerminate = true;
      }
    }));
    feederThread.start();
    writerThread.start();
    feederThread.join();
    writerThread.join();
  }

  private boolean isSpaceAboveHighWaterMark() {
    return (numEntriesSent - numEntriesReceived > ENTRIES_IN_SPACE_HWM);
  }

  private boolean isSpaceBelowLowWaterMark() {
    return (numEntriesSent - numEntriesReceived < ENTRIES_IN_SPACE_LWM);
  }

  public void pause(long intervalMillis) {
    try { Thread.sleep(intervalMillis); }
    catch (InterruptedException e) {
      logger.info("Pause Interrupted");
    }
  }

  /**
   * Models the feeder processing unit.
   */
  private class Feeder extends AbstractProcessor {
    
    private JavaSpace space;
    private int currentIndex = 0;
    
    public Feeder() throws Exception {
      super();
      this.space = getSpace();
    }
    
    @Override
    public void process() throws Exception {
      Document doc = new Document();
      doc.url = currentIndex + ".html";
      doc.status = Document.Status.New; // new, ready to process
      // TODO: scatter using mod(hash(url),number_of_spaces)
      logger.info("Feeding " + doc);
      space.write(doc, null, Lease.FOREVER);
      currentIndex++;
    }
  }

  /**
   * Models the Writer processing unit.
   */
  private class WriteProcessor extends AbstractProcessor {
    
    private JavaSpace space;
    private Document template;
    private PrintWriter writer;
    
    public WriteProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.Indexed);
      this.writer = new PrintWriter(new FileWriter("/tmp/docs.txt"), true);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        // other processing here, if applicable, before writing out...
        logger.info("Writing " + doc);
        doc.status = Document.Status.Written;
        writeToFile(writer, doc);
        // ... we don't write this back into the space, because
        // its the end of the line
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }

    public void destroy() {
      if (writer != null) {
        writer.flush();
        writer.close();
      }
    }
    
    private void writeToFile(PrintWriter writer, Document doc) {
      writer.println(doc.toString());
      writer.flush();
    }
  }

  /**
   * This is how we are called.
   */
  public static void main(String[] args) {
    if (System.getSecurityManager() == null) {
      System.setSecurityManager(new RMISecurityManager());
    }
    try {
      Master master = new Master();
      master.scatterGather();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

An improvement to this would be to have the Feeder distribute requests to multiple JavaSpaces on multiple machines, each with a local Slave, and have the Writer gather the results back into a single location on disk. This will make the code a bit more complicated though, and I would probably need to change the code to have the Writer register for Events raised by the JavaSpace when it finds a Document with Status.Indexed.

Slave.java

Similar to the Master, I decided to also coalesce the Downloader and Indexer into a single class in order to make it easier to manage from a single script. It is designed to be colocated the JavaSpace on the same machine. The code is shown below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Slave.java
package com.mycompany.myapp.concurrent.blitz;

import java.rmi.RMISecurityManager;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.jini.core.lease.Lease;
import net.jini.core.transaction.Transaction;
import net.jini.space.JavaSpace;

/**
 * Models a slave process. There can be many slave processes in
 * the system, each attached to a single JavaSpace. In the current
 * setup, a slave consists of 1 instance each of the Download and
 * the Index thread, but optionally (not implemented yet) can 
 * consist of a set of m Downloaders and n Indexers where (m,n)
 * are controlled by configuration, and depend on the machine on
 * which it will run. The processing units are coalesced into threads
 * to make it easier to manage - there is no direct communication 
 * between the threads.
 */
public class Slave {

  private final Logger logger = Logger.getLogger(this.getClass().getName());
  private static final long RETRY_INTERVAL_MILLIS = 5000L; // 5 seconds
  
  private boolean shouldTerminate = false;
  
  public void doWork() throws Exception {
    Thread downloadThread = new Thread(new Runnable() {
      public void run() {
        try {
          DownloadProcessor downloader = new DownloadProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              try {
                downloader.process();
              } catch (Exception e) {
                logger.log(Level.WARNING, 
                  "Download Process failed, retrying...", e);
                pause(RETRY_INTERVAL_MILLIS);
                continue;
              }
            } catch (Exception e) {
            }
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Thread indexThread = new Thread(new Runnable() {
      public void run() {
        try {
          IndexProcessor indexer = new IndexProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              indexer.process();
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Index process failed, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() {
        shouldTerminate = true;
      }
    }));
    downloadThread.start();
    indexThread.start();
    downloadThread.join();
    indexThread.join();
  }

  private void pause(long retryIntervalMills) {
    try { Thread.sleep(RETRY_INTERVAL_MILLIS); }
    catch (InterruptedException e) {
      logger.info("Pause interrupted");
    }
  }

  /**
   * Models the Download Processing Unit.
   */
  private class DownloadProcessor extends AbstractProcessor {
    
    private JavaSpace space;
    private Document template;
    
    public DownloadProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.New);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        // more processing here...
        logger.info("Downloading " + doc);
        doc.status = Document.Status.Downloaded;
        space.write(doc, tx, DEFAULT_LEASE_TIMEOUT);
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }
  }
  
  /**
   * Models the Indexing Processing Unit.
   */
  private class IndexProcessor extends AbstractProcessor {

    private JavaSpace space;
    private Document template;
    
    public IndexProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.Downloaded);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        logger.info("Indexing " + doc);
        doc.status = Document.Status.Indexed;
        space.write(doc, tx, DEFAULT_LEASE_TIMEOUT);
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }
  }

  /**
   * This is how we are called.
   */
  public static void main(String[] args) {
    if (System.getSecurityManager() == null) {
      System.setSecurityManager(new RMISecurityManager());
    }
    try {
      Slave slave = new Slave();
      slave.doWork();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

The example shows a single instance of the Downloader and Indexer, but one possible improvement to take advantage of multiple cores would be to have multiple Download and Index threads in a single Slave instance.

runpu.sh

Finally, I call both the master and the slave using the following simple shell script.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#!/bin/bash
M2_REPO=$HOME/.m2/repository
BLITZ_HOME=/opt/blitz-2.1
CLASSPATH=\
  $M2_REPO/com/sun/jini/jsk-lib/2.1/jsk-lib-2.1.jar:\
  $M2_REPO/com/sun/jini/jsk-platform/2.1/jsk-platform-2.1.jar:\
  target/multicore-1.0-SNAPSHOT.jar 

case "$1" in
  master) java -Djava.security.policy=$BLITZ_HOME/config/policy.all \
          -classpath $CLASSPATH \
          com.mycompany.myapp.concurrent.blitz.Master 2>&1 | tee $1.log
     ;;
  slave) java -Djava.security.policy=$BLITZ_HOME/config/policy.all \
          -classpath $CLASSPATH \
          com.mycompany.myapp.concurrent.blitz.Slave 2>&1 | tee $1.log
     ;;
  *) echo "Usage: $0 {master|slave}"
     ;;
esac

Running the example

I run all the components locally on my laptop. 4 terminal windows are required.

Start Blitz Server

To do this, navigate to your blitz installation directory - mine is /opt/blitz-2.1, and start blitz using the supplied shell script. If you are testing your code, it is probably better to start it in transient mode, this will require you to replace PersistentMode (the default) with Transient in config/blitz.config.

1
2
3
sujit@sirocco:~$ cd /opt/blitz-2.1
sujit@sirocco:~$ ./blitz.sh
...you will see blitz starting up here

Start Blitz Dashboard

The Blitz dashboard is a simple GUI tool that tells you the server's memory footprint, number of active entries, operations, transactions, etc. It is useful to monitor the progress of your job.

1
2
3
sujit@sirocco:~$ cd /opt/blitz-2.1
sujit@sirocco:~$ ./dashboard.sh
...you will see logging to console and the GUI come up

Start Slave

We want to start the slave first because the slave will block for input from the master. To do this:

1
2
3
4
sujit@sirocco:~$ cd /apps/multicore
sujit@sirocco:~$ ./runpu.sh slave
...will output to console once master submits jobs to the space
...terminate with ^C

Start Master

Starting the master is similar to starting the slave, only a different parameter.

1
2
3
4
sujit@sirocco:~$ cd /apps/multicore
sujit@sirocco:~$ ./runpu.sh master
...you will see the master feeding and collecting Document objects.
...terminate with ^C

A partial master log is shown below. As you can see, it pauses when the space gets too full.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
May 10, 2009 5:46:13 PM Master$Feeder process INFO: Feeding 0.html (new)
May 10, 2009 5:46:13 PM Master$Feeder process INFO: Feeding 1.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 2.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 3.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 4.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 5.html (new)
May 10, 2009 5:46:14 PM Master$1 run INFO: Space full, pausing
May 10, 2009 5:46:14 PM Master$WriteProcessor process INFO: Writing 0.html (indexed)
May 10, 2009 5:46:14 PM Master$WriteProcessor process INFO: Writing 5.html (indexed)
...

Here is a partial snip of the slave log - this is probably less interesting, it simply reports on the progress as the Document is processed by the processors. One important thing to note here is that order may not be maintained, which is okay in my case.

1
2
3
4
5
6
7
8
9
May 10, 2009 5:46:14 PM Slave$DownloadProcessor process INFO: Downloading
0.html (new)
May 10, 2009 5:46:14 PM Slave$IndexProcessor process INFO: Indexing 0.html
(downloaded)
May 10, 2009 5:46:14 PM Slave$DownloadProcessor process INFO: Downloading
5.html (new)
May 10, 2009 5:46:14 PM Slave$IndexProcessor process INFO: Indexing 5.html
(downloaded)
...

Open Issues

I noticed that I was getting an Out of Memory Exception when I increase the number of tasks fed in by the Feeder. It looks like it could be a problem with the number of threads per process setting on my Linux distribution, and involves recompiling glibc and the kernel. The last time I saw this was in the Java 1.2 days, probably because that was the last time I used native threads from within my Java code. JINI does use native threads too, so that may be the reason. I will post updates as I know more about this problem.

Conclusion

I think JavaSpaces is a good strategy to consider if you have to solve problems that can be broken up into relatively independent stages, some of which require lots of resources and some that don't. Unlike MapReduce, where the hardest part is to figure out how to fit your application into the framework, and usually requires algorithm changes and application redesign, partitioning an application with JavaSpaces is much easier and less invasive.

The other thing I had was my perception of the difficulty of setting up a JavaSpaces server, but that turned out to be a non-issue. Of course, the example I posted is fairly simple, and its possible that I will hit issues with a "real" application, but I guess I will cross that bridge when I get to it.

References

JavaSpaces is relatively less popular, compared to, say Servlets or JDBC, so information on the Internet is a bit harder to find. I found a few sites which helped me quite a bit, so I am including them here:

Friday, May 01, 2009

HTTP Search Interface to Lucene using Mule

Introduction

For quite a while now, I've been thinking, off and on, about centralizing our search functionality. Currently, our indexes are deployed locally with the application, which is something of an operations nightmare. As we scale out by increasing the number of machines in our tiers, and introduce brand new tiers with new products, the situation can only get worse. Some time ago, I had built a simple RMI server which would be a central repository of all our indexes (perhaps scaled out horizontally using a load balancer), but that would have needed quite a bit of change to our codebase to perform reasonably, so I abandoned the idea. Other things came up and I forgot about this - from the looks of it, reports of operator nightmares seem to have been grossly exaggerated :-).

Why not Solr?

At this point, most of you would be thinking about Solr, and wonder why I am attempting to reinvent the wheel. Well, for a couple of reasons, actually:

  1. Solr is very customizable, but it offers no customization hook for the one place I need it most. Our search is really a meta-search, aggregating results from multiple internal sources, each of which can be backed by multiple indexes, each of which is built using radically different analyzers. Solr follows the one IndexSearcher per instance model, which is unlikely to change, since its update strategy is based on this assumption. We could probably use Solr's distributed search to get around that, but the performance penalty would be too high.
  2. Unlike Solr, our model of updating indexes is to simply replace them with a freshly built one. Logic to detect the availability of a new index is built into the code, so no application restarts are necessary. I could actually implement this with Solr with a custom RequestHandler, much simpler than is currently implemented in our code.

Why Mule?

I started thinking about this again recently after I attended a talk by Ken Yagen some weeks ago on Mule ESB at the EBig Java SIG. Instead of using RMI, this time I decided I would build something along the lines of Solr, ie, an HTTP interface to the indexes, and it seemed like a good way to get familiar with Mule. So here it is...

POM changes

I used the Maven archetype from here, and changed the version parameter to reflect the current version. In addition, I added in the dependency to the Mule HTTP Transport and Lucene 2.4. The differences are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<project ...>
  ...
  <properties>
    <mule.version>2.2.1</mule.version>
  </properties>

  <dependencies>
    ...
    <!-- Add support for http -->
    <dependency>
      <groupId>org.mule.transports</groupId>
      <artifactId>mule-transport-http</artifactId>
      <version>${mule.version}</version>
      <scope>provided</scope>
    </dependency>
    <!-- Add Support for Lucene -->
    <dependency>
      <groupId>org.apache.lucene</groupId>
      <artifactId>lucene-core</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

  </dependencies>
  ...
</project>

The Configuration

Mule uses its own XML configuration. The configuration file shown below contains the details of the entire Mule service. You can find more details about configuring a Mule service here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/main/resources/mule-config-spring.xml -->
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:spring="http://www.springframework.org/schema/beans"
       xmlns:http="http://www.mulesource.org/schema/mule/http/2.2"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.mulesource.org/schema/mule/core/2.2 
       http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
       http://www.mulesource.org/schema/mule/http/2.2 
       http://www.mulesource.org/schema/mule/http/2.2/mule-http.xsd">
       
  <!-- Application specific beans -->
  <spring:beans>
    <spring:import resource="classpath:components-spring.xml"/>
    <spring:bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
      <spring:property name="location" 
        value="classpath:mule-config-spring.properties"/>
    </spring:bean>
  </spring:beans>       

  <!-- Connectors -->
  <http:connector name="httpConnector" enableCookies="false" keepAlive="true"/>
  
  <!-- Transformers -->
  <custom-transformer name="requestTransformer" 
    class="org.mule.transport.http.transformers.HttpRequestBodyToParamMap"/>

  <!-- Model -->
  <model name="main">
    <service name="searchService">
      <inbound>
        <http:inbound-endpoint address="http://localhost:8888/search" 
          synchronous="true" contentType="text/xml" 
          transformer-refs="requestTransformer"/>
      </inbound>
      <component>
        <spring-object bean="searchServiceUmo"/>
      </component>
    </service>
  </model>
</mule>

Mule configuration integrates very nicely with Spring's. The only application code in the service is the SearchServiceUmo, which is defined in the components-spring.xml file below, using standard Spring semantics. This file is referenced from the main configuration file using an import.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/main/resources/components-spring.xml -->
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans.xsd">
  
  <bean id="searchServiceUmo" 
      class="com.mycompany.searchservice.SearchServiceUmo"
      init-method="init" destroy-method="destroy">
    <property name="indexPaths">
      <list>
        <value>/path/to/my/index</value>
      </list>
    </property>
  </bean>
</beans>

The Code

The workhorse class is the SearchServiceUmo. It takes in a Map of request parameters representing a query from a remote client, and executes a Lucene search against a local index. It then returns a List of result beans (a POJO, shown below), converted into an XML stream. One important thing to note is that there is no mention of any Mule API or classes, ie, coupling between application code and Mule is only via the XML wiring.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Source: src/main/java/com/mycompany/searchservice/SearchServiceUmo.java
package com.mycompany.searchservice;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CachingWrapperFilter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.BooleanClause.Occur;

import com.thoughtworks.xstream.XStream;

/**
 * User defined search service.
 */
public class SearchServiceUmo {

  private final Log log = LogFactory.getLog(getClass());

  private static final Analyzer ANALYZER = new StandardAnalyzer();
  
  private List<String> indexPaths;
  private Searcher searcher;
  
  public void setIndexPaths(List<String> indexPaths) {
    this.indexPaths = indexPaths;
  }
  
  protected void init() throws Exception {
    if (indexPaths.size() == 0) {
      throw new IllegalArgumentException(
        "At least one index must be specified");
    } else if (indexPaths.size() == 1) {
      this.searcher = new IndexSearcher(indexPaths.get(0));
    } else {
      Searcher[] searchers = new Searcher[indexPaths.size()];
      for (int i = 0; i < searchers.length; i++) {
        searchers[i] = new IndexSearcher(indexPaths.get(i));
      }
      this.searcher = new MultiSearcher(searchers);
    }
  }
  
  protected void destroy() {
    if (searcher != null) {
      try {
        searcher.close();
        searcher = null;
      } catch (Exception e) {
        log.warn("Searcher at " + indexPaths + 
          " could not be closed", e);
      }
    }
  }

  /**
   * For synchronous services, there does not seem to be a way to 
   * apply a transformation on the results returned from a component,
   * so we are doing this in code...its probably not that big a deal, 
   * since its only 2 lines of code, but it would be nice if we could 
   * do this using an available Mule component (ObjectToXml is available,
   * but cannot be used without complex shenanigans, as far as I can see).
   * @param params the request parameters as a Map of name-value pairs.
   * @return the response XML string.
   * @throws Exception if thrown.
   */
  public String search(Map<String,Object> params) throws Exception {
    List<SearchResultBean> beans = searchInternal(params);
    String result = new XStream().toXML(beans);
    return result;
  }
  
  private List<SearchResultBean> searchInternal
      (Map<String,Object> params) throws Exception {
    // we could probably write a custom transformer here to get a 
    // parameter object as our argument, which would allow for 
    // multiple params with the same name, and other good stuff, 
    // but we are lazy, so...
    if (params.containsKey("reopen")) {
      // this is for the batch update script
      destroy();
      init();
      return Collections.emptyList();
    } else {
      Query query = buildQuery((String) params.get("query"));
      Filter filter = buildFilter((String) params.get("filter"));
      Sort sort = buildSort((String) params.get("sort"));
      int startIndex = Integer.valueOf((String) params.get("start"));
      int endIndex = Integer.valueOf((String) params.get("end"));
      TopDocs td = searcher.search(query, filter, endIndex, sort);
      ScoreDoc[] sds = td.scoreDocs;
      List<SearchResultBean> results = 
        new ArrayList<SearchResultBean>();
      for (int i = startIndex; i < endIndex; i++) {
        Document doc = searcher.doc(sds[i].doc);
        results.add(new SearchResultBean(doc, sds[i].score));
      }
      return results;
    }
  }

  /**
   * Build up the query object using some standard rules. In this case,
   * our rules are (body:${q}) OR (title:${q})^4.0. We used Standard
   * Analyzer to tokenize both body and title at index time, so we must 
   * also use it for query building.
   * @param q the query term.
   * @return the Query object.
   */
  private Query buildQuery(String q) throws Exception {
    BooleanQuery query = new BooleanQuery();
    Query titleQuery = new QueryParser("title", ANALYZER).parse(q);
    titleQuery.setBoost(4.0F);
    query.add(titleQuery, Occur.SHOULD);
    Query bodyQuery = new QueryParser("body", ANALYZER).parse(q);
    query.add(bodyQuery, Occur.SHOULD);
    return query;
  }
  
  /**
   * Some filtering criteria. In our case, we know that our tags contain
   * our filtering criteria, so we use that. The parameter is a comma-
   * separated list of tags. The tags are indexed without tokenizing, so
   * we use a plain TermQuery here.
   * @param tags the tags to filter on.
   * @return a Filter object.
   */
  private Filter buildFilter(String tags) {
    if (StringUtils.isEmpty(tags)) {
      return null;
    }
    BooleanQuery query = new BooleanQuery();
    String[] tagArray = StringUtils.split(tags, ",");
    for (int i = 0; i < tagArray.length; i++) {
      TermQuery tquery = new TermQuery(new Term("tag", tagArray[i]));
      query.add(tquery, Occur.MUST);
    }
    return new CachingWrapperFilter(new QueryWrapperFilter(query));
  }

  /**
   * We always sort by the natural order of the sort fields specified.
   * If a field is prefixed with a '-', then we reverse the natural
   * sort order for that field. 
   * @param sortFields a comma-separated list of sort fields to sort by.
   * @return a Sort object for this search.
   */
  private Sort buildSort(String sortFields) {
    if (StringUtils.isEmpty(sortFields)) {
      return Sort.RELEVANCE;
    }
    String[] sortFieldArray = StringUtils.split(sortFields, ",");
    SortField[] sfs = new SortField[sortFieldArray.length];
    for (int i = 0; i < sortFieldArray.length; i++) {
      if (sortFieldArray[i].startsWith("-")) {
        sfs[i] = new SortField(sortFieldArray[i].substring(1), true);
      } else {
        sfs[i] = new SortField(sortFieldArray[i]);
      }
    }
    return new Sort(sfs);
  }
}

The SearchResultBean is a POJO. I have removed the getters and setters to keep the code short. Use your IDE to generate them. Note that if you want to deserialize the XML back into this bean on the client side, the bean must exist on the client's CLASSPATH as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Source: src/main/java/com/mycompany/searchservice/SearchResultBean.java
package com.mycompany.searchservice;

import java.io.Serializable;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.lucene.document.Document;

/**
 * Simple POJO to hold the contents of a search result. This should 
 * be available on the client side as well, in order for XStream to 
 * be able to deserialize this into a SearchResultBean.
 */
public class SearchResultBean implements Serializable {
  
  private static final long serialVersionUID = -2701792004759978895L;
  
  private String id;
  private String title;
  private String summary;
  private String[] tags;
  private String url;
  private float score;
  
  public SearchResultBean(Document doc, float score) {
    this.id = doc.get("id");
    this.title = doc.get("title");
    this.summary = doc.get("summary");
    this.tags = doc.getValues("tags");
    this.url = doc.get("url");
    this.score = score;
  }

  // ... getters and setters removed for brevity

  @Override
  public String toString() {
    return ReflectionToStringBuilder.toString(this);
  }
}

The Main class is not needed if you are using a Mule installation. In my case, this allows me to startup the Mule service from within my IDE, and is adapted from the template in the archetype.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Source: src/main/java/com/mycompany/searchservice/Main.java
package com.mycompany.searchservice;

import org.apache.log4j.BasicConfigurator;
import org.mule.api.MuleContext;
import org.mule.api.MuleException;
import org.mule.api.config.ConfigurationBuilder;
import org.mule.api.config.ConfigurationException;
import org.mule.api.context.MuleContextFactory;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextFactory;

/**
 * Launcher for the Mule based search service.
 */
public class Main {

  public static void main(String[] args) {
    BasicConfigurator.configure();
    MuleContext context = null;
    String[] resources = {"mule-config-spring.xml"};
    try {
      MuleContextFactory factory = new DefaultMuleContextFactory();
      ConfigurationBuilder builder = 
        new SpringXmlConfigurationBuilder(resources);
      context = factory.createMuleContext(builder);
      context.start();
      System.out.println("Starting Mule Instance");
    } catch (ConfigurationException e) {
      e.printStackTrace();
    } catch (InitialisationException e) {
      e.printStackTrace();
    } catch (MuleException e) {
      e.printStackTrace();
    }
  }
}

The Obligatory Screenshot

...just to show you how it all works. Typing in the URL:

1
http://localhost:8888/search?query=maven&start=0&end=10

Returns a screenful of XML as shown below:

Client code could call this using a simple HTTP Client, which would deserialize the XML (perhaps using XStream) back into a List of SearchResultBean objects, and use it as required in the application. In the case of a more Mule aware organization, the client would probably also be a Mule service.

Conclusion

As you can see, Mule provides a lot of components and XML wiring features that make it easy for the application developer to concentrate on the business logic and leave the integration details to Mule. However, while I was building this application, I realized that it would be more pragmatic (and easier) to just build a simple web application wrapper.

Obviously, this is not a reflection on the quality of Mule software. In a shop that is using Mule more heavily, this would probably be an ideal approach. However, ingesting the Mule elephant (sorry, mixing metaphors here) to just take advantage of its HTTP connector and a couple of transformers seems like a bit of overkill. Developers here are very familiar with Spring and Lucene, so building a simple web application is far simpler than learning the Mule architecture and all its components.

Another thing I noticed is that there seems to be more emphasis on asynchronous messaging in Mule, and perhaps rightly so. In my case, I would have liked to be able to wire up a transformer after my component runs, perhaps in an outbound component, but since my service is synchronous, I can only configure my inbound endpoint, which does not allow transformers after the component is run. I ultimately ended up putting the post-transformation in the service code itself. Of course, Mule is a work in progress, so I am sure the functionality will show up in a later version if it doesn't exist already. If you know how to achieve this, please let me know.

References

I found the following sites helpful during my development, if you want to try out something along similar lines, you will probably find them helpful too.

  • Maven Archetype for Mule Projects from the Morning Java blog. The archetype provides a simple example of a Mule service which is very helpful. It is based on Mule 2.0.0-RC2, but a simple version change in the POM got me set up with the current Mule version (2.2.1 at the time of this writing). In addition, I had to add in the dependency to Lucene and the Mule HTTP Connector (see the POM snippets above).
  • Mule Instance Configuration from the Mule documentation.

  • This page provides some information about modeling synchronous request-response style messaging in Mule.
  • A very informative article from InfoQ, written by Jackie Wheeler.
  • This discussion thread provided me with insight about how to handle HTTP connectors in the application.
  • I've been meaning to look at Solr for a while now, and I finally did it before starting on this application, to see if I could use Solr. The Solr Getting Started Guide was very helpful to set up a simple Solr instance which I could experiment with while going through the Solr code and documentation.