Showing posts with label remoting. Show all posts
Showing posts with label remoting. Show all posts

Wednesday, January 01, 2014

Akka Content Ingestion Pipeline, Part IV


Happy New Year! Over the last three posts, I've described an Akka based Content Ingestion Pipeline modelled after the NutchGORA pipeline. This pipeline was my vehicle for learning Akka, something I've been planning to do for a while. In this post (the final installment on this series, at least so far), I explore how to distribute this application horizontally across multiple machines (scale out).

Akka is distributed by design. The code I've built so far can be regarded as a single server version of a distributed system. According to the Akka remoting documentation (emphasis mine):

Everything in Akka is designed to work in a distributed setting: all interactions of actors use purely message passing and everything is asynchronous. This effort has been undertaken to ensure that all functions are available equally when running within a single JVM or on a cluster of hundreds of machines. The key for enabling this is to go from remote to local by way of optimization instead of trying to go from local to remote by way of generalization.

Akka can work with remote Actors in two ways, either by looking them up in a remote ActorSystem, or by creating them in the remote ActorSystem. I use the latter approach. The components that do the heavy lifting in the pipeline are the workers, and scaling out to handle more incoming requests would imply increasing the number of workers or making them faster, both of which can be done by giving them their own dedicated hardware.

The architecture diagram has been updated with the distribution boundaries, they are indicated by the gray boxes below. The master node is the large gray box on the top, and contains the REST Interface, the Controller and the Router actors. The worker nodes (can be an array of nodes for each worker class) are the ones that wrap the Fetch, Parse and Index worker arrays.


Each of these nodes are wrapped in an Akka ActorSystem, which can be accessed by an URI from other ActorSystems. So in addition to the HTTP interface that the master node exposes to the outside world, it also exposes a host:port and has a name that other Akka ActorSystems can use to communicate with it.

For testing, I configured the pipeline with just 2 ActorSystems - the master node listening on localhost:2552 and identified by URI akka.tcp://DelSym@localhost:2552, and one remote node listening on localhost:2553 and identified by URI akka.tcp://remote@localhost:2553. Here is some code to create a named (name supplied from command line) remote Akka ActorSystem using configuration parameters in the remote.conf file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/scala/com/mycompany/delsym/remote/RemoteAkka.scala
package com.mycompany.delsym.remote

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RemoteAkka extends App {

  val name = if (args.isEmpty) "remote" else args(0)
  
  val conf = ConfigFactory.load("remote")
  val host = conf.getString("akka.remote.netty.tcp.hostname")
  val port = conf.getInt("akka.remote.netty.tcp.port")
  
  val system = ActorSystem(name, conf)
  Console.println("Remote system [%s] listening on %s:%d"
    .format(name, host, port))
  
  sys.addShutdownHook {
    Console.println("Shutting down Remote Akka")
    system.shutdown
  }
}

The remote.conf file looks like this. This is meant to be used in order to start up ActorSystems on multiple nodes in a network.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Source: src/main/resources/remote.conf
akka {
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2553
    }
  }
}

I then added a property in application.conf to specify a list of ActorSystem URIs for the routers. The routers are Round Robin routers, so giving them a list of ActorSystem URIs will cause them to cycle through the URIs, creating remote Actors and distributing evenly across multiple remote ActorSystems. The Controller Actor code (which instantiates the routers) has been modified to create local workers if the node list is empty and remote workers otherwise. The updated code for the Controller is shown below:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Source: src/main/scala/com/mycompany/delsym/actors/Controller.scala
package com.mycompany.delsym.actors

import scala.collection.JavaConversions.asScalaBuffer
import scala.concurrent.duration.DurationInt

import com.mycompany.delsym.daos.HtmlOutlinkFinder
import com.mycompany.delsym.daos.MockOutlinkFinder
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigList

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.AddressFromURIString
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.actorRef2Scala
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RoundRobinRouter
import akka.routing.RouterConfig

class Controller extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy(
      maxNrOfRetries = 10,
      withinTimeRange = 1.minute) {
    case _: Exception => SupervisorStrategy.Restart
  }
  
  val reaper = context.actorOf(Props[Reaper], name="reaper")

  val conf = ConfigFactory.load()
  val numFetchers = conf.getInt("delsym.fetchers.numworkers")
  val fetchNodes = conf.getList("delsym.fetchers.nodes")
  
  val numParsers = conf.getInt("delsym.parsers.numworkers")
  val parseNodes = conf.getList("delsym.parsers.nodes")
  
  val numIndexers = conf.getInt("delsym.indexers.numworkers")
  val indexNodes = conf.getList("delsym.indexers.nodes")
  
  val testUser = conf.getBoolean("delsym.testuser")
  val outlinkFinder = if (testUser) new MockOutlinkFinder()
                      else new HtmlOutlinkFinder()
  
  val queueSizes = scala.collection.mutable.Map[String,Long]()
  
  val fetchers = context.actorOf(Props[FetchWorker]
    .withRouter(buildRouter(numFetchers, fetchNodes)), 
    name="fetchers")
  reaper ! Register(fetchers)
  queueSizes += (("fetchers", 0L))

  val parsers = context.actorOf(Props[ParseWorker]
    .withRouter(buildRouter(numParsers, parseNodes)), 
    name="parsers")
  reaper ! Register(parsers)
  queueSizes += (("parsers", 0L))
  
  val indexers = context.actorOf(Props[IndexWorker]
    .withRouter(buildRouter(numIndexers, indexNodes)),
    name="indexers")
  reaper ! Register(indexers)
  queueSizes += (("indexers", 0L))

  def receive = {
    case m: Fetch => {
      increment("fetchers")
      fetchers ! m
    }
    case m: FetchComplete => {
      decrement("fetchers")
      if (m.fwd) parsers ! Parse(m.url)
    }
    case m: Parse => {
      increment("parsers")
      parsers ! m
    }
    case m: ParseComplete => {
      decrement("parsers")
      outlinks(m.url).map(outlink => 
        fetchers ! Fetch(outlink._1, outlink._2, outlink._3))
      if (m.fwd) indexers ! Index(m.url)
    }
    case m: Index => {
      increment("indexers")
      indexers ! m
    }
    case m: IndexComplete => {
      decrement("indexers")
    }
    case m: Stats => {
      sender ! queueSize()
    }
    case m: Stop => {
      reaper ! Stop(0)
    }
    case _ => log.info("Unknown message received.")
  }
  
  def buildRouter(n: Int, nodes: ConfigList): RouterConfig = {
    if (nodes.isEmpty) RoundRobinRouter(n)
    else {
      val addrs = nodes.unwrapped()
        .map(node => node.asInstanceOf[String])
        .map(node => AddressFromURIString(node))
        .toSeq
      RemoteRouterConfig(RoundRobinRouter(n), addrs)
    }
  }
  
  def queueSize(): Stats = Stats(queueSizes.toMap)
  
  def outlinks(url: String): 
      List[(String,Int,Map[String,String])] = {
    outlinkFinder.findOutlinks(url) match {
      case Right(triples) => triples
      case Left(f) => List.empty
    }
  }
  
  def increment(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) + 1))
  }
  
  def decrement(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) - 1))
  }
}

The documentation indicates that a better approach would be to declare the routers in configuration, so the local configuration would be different from the distributed configuration. I did not do this because my test case refers to the routers as /controller/* but the actual code refers to it as /api/controller/* (I should probably change the test code but I was too lazy). But in any case, changing from a local to a remote router configuration is simply a matter of wrapping the Router Configuration with a RemoteRouterConfig (buildRouter function in the code above), so this approach works fine also.

Going from local to remote also requires you to think about serialization. I have chosen to use Java serialization, and I have configured Akka (via the application.conf file) to automatically use Java serialization for my messages. In addition, the distributed version of the master ActorSystem also exposes its own address in the configuration and sets the provider to a remote ActorRef provider. The other important difference is the non empty nodes list value under the delsym namespace for each of the fetcher, parser and indexer. The remote configuration is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Source: src/main/resources/application.conf.remote
akka {
  loglevel = INFO
  stdout-loglevel = INFO
  akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
    }
    serializers {
      java = "akka.serialization.JavaSerializer"
    }
    serialization-bindings {
      "com.mycompany.delsym.actors.DelsymMessage" = java
    }
  }
}

spray {
  can {
    server {
      server-header = "DelSym REST API"
    }
  }
}

delsym {
  testuser = true
  fetchers {
    numworkers = 5
    refreshIntervalDays = 30
    numRetries = 3
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  parsers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  indexers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  mongodb {
    host = "127.0.0.1"
    port = 27017
    dbname = "delsymdb"
    collname = "documents"
  }
  solr {
    server = "http://127.0.0.1:8983/solr/collection1/"
    dbfieldnames = "_id,url,p_title,p_author,textContent"
    solrfieldnames = "id,url,title,author,text"
    commitInterval = 10
  }
  rest {
    host = "127.0.0.1"
    port = 8080
    timeout = 1
  }
}

There are now 3 versions of application.conf in the Delsym repo on GitHub. You will have to link to the correct one depending on whether you want to run the mock tests, run in local (all actors on single ActorSystem) or remote (multiple ActorSystems) mode.

The effort to build the code for this part of the pipeline was mostly conceptual, ie, understanding how the different components fit together. I found the following Akka reference pages very useful. The pages are all for Akka version 2.2.3 (latest stable version) that I used for this work - the default pages that show up (in response to a Google search for example) are for version 2.3 which is still in development. The 2.3 code is different enough for this detail to be annoying, so mentioning it here.


In addition, I also found the akka-sample-remote-scala useful, although the pattern shown there is slightly different from what I used. Another useful source was the Remoting chapter from the Akka Essentials book.

I was able to run the ActorFlowTest unit test with Mock workers (minus the asserts, since the workers update counters on the remote ActorSystem which I no longer have control over) and verify from the logs that the fetching, parsing and indexing happen on my remote ActorSystem at localhost:2553. The code also exits cleanly which means Deathwatch works fine with remote workers. However, I saw lots of messages sent to the dead-letter mailbox which I haven't been able to figure out yet (they are not application messages) - I will post an update here (and bugfix to the DelSym GitHub repo once I do.

Friday, May 01, 2009

HTTP Search Interface to Lucene using Mule

Introduction

For quite a while now, I've been thinking, off and on, about centralizing our search functionality. Currently, our indexes are deployed locally with the application, which is something of an operations nightmare. As we scale out by increasing the number of machines in our tiers, and introduce brand new tiers with new products, the situation can only get worse. Some time ago, I had built a simple RMI server which would be a central repository of all our indexes (perhaps scaled out horizontally using a load balancer), but that would have needed quite a bit of change to our codebase to perform reasonably, so I abandoned the idea. Other things came up and I forgot about this - from the looks of it, reports of operator nightmares seem to have been grossly exaggerated :-).

Why not Solr?

At this point, most of you would be thinking about Solr, and wonder why I am attempting to reinvent the wheel. Well, for a couple of reasons, actually:

  1. Solr is very customizable, but it offers no customization hook for the one place I need it most. Our search is really a meta-search, aggregating results from multiple internal sources, each of which can be backed by multiple indexes, each of which is built using radically different analyzers. Solr follows the one IndexSearcher per instance model, which is unlikely to change, since its update strategy is based on this assumption. We could probably use Solr's distributed search to get around that, but the performance penalty would be too high.
  2. Unlike Solr, our model of updating indexes is to simply replace them with a freshly built one. Logic to detect the availability of a new index is built into the code, so no application restarts are necessary. I could actually implement this with Solr with a custom RequestHandler, much simpler than is currently implemented in our code.

Why Mule?

I started thinking about this again recently after I attended a talk by Ken Yagen some weeks ago on Mule ESB at the EBig Java SIG. Instead of using RMI, this time I decided I would build something along the lines of Solr, ie, an HTTP interface to the indexes, and it seemed like a good way to get familiar with Mule. So here it is...

POM changes

I used the Maven archetype from here, and changed the version parameter to reflect the current version. In addition, I added in the dependency to the Mule HTTP Transport and Lucene 2.4. The differences are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xml version="1.0" encoding="UTF-8"?>
<project ...>
  ...
  <properties>
    <mule.version>2.2.1</mule.version>
  </properties>

  <dependencies>
    ...
    <!-- Add support for http -->
    <dependency>
      <groupId>org.mule.transports</groupId>
      <artifactId>mule-transport-http</artifactId>
      <version>${mule.version}</version>
      <scope>provided</scope>
    </dependency>
    <!-- Add Support for Lucene -->
    <dependency>
      <groupId>org.apache.lucene</groupId>
      <artifactId>lucene-core</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

  </dependencies>
  ...
</project>

The Configuration

Mule uses its own XML configuration. The configuration file shown below contains the details of the entire Mule service. You can find more details about configuring a Mule service here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/main/resources/mule-config-spring.xml -->
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:spring="http://www.springframework.org/schema/beans"
       xmlns:http="http://www.mulesource.org/schema/mule/http/2.2"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.mulesource.org/schema/mule/core/2.2 
       http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
       http://www.mulesource.org/schema/mule/http/2.2 
       http://www.mulesource.org/schema/mule/http/2.2/mule-http.xsd">
       
  <!-- Application specific beans -->
  <spring:beans>
    <spring:import resource="classpath:components-spring.xml"/>
    <spring:bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
      <spring:property name="location" 
        value="classpath:mule-config-spring.properties"/>
    </spring:bean>
  </spring:beans>       

  <!-- Connectors -->
  <http:connector name="httpConnector" enableCookies="false" keepAlive="true"/>
  
  <!-- Transformers -->
  <custom-transformer name="requestTransformer" 
    class="org.mule.transport.http.transformers.HttpRequestBodyToParamMap"/>

  <!-- Model -->
  <model name="main">
    <service name="searchService">
      <inbound>
        <http:inbound-endpoint address="http://localhost:8888/search" 
          synchronous="true" contentType="text/xml" 
          transformer-refs="requestTransformer"/>
      </inbound>
      <component>
        <spring-object bean="searchServiceUmo"/>
      </component>
    </service>
  </model>
</mule>

Mule configuration integrates very nicely with Spring's. The only application code in the service is the SearchServiceUmo, which is defined in the components-spring.xml file below, using standard Spring semantics. This file is referenced from the main configuration file using an import.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/main/resources/components-spring.xml -->
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans.xsd">
  
  <bean id="searchServiceUmo" 
      class="com.mycompany.searchservice.SearchServiceUmo"
      init-method="init" destroy-method="destroy">
    <property name="indexPaths">
      <list>
        <value>/path/to/my/index</value>
      </list>
    </property>
  </bean>
</beans>

The Code

The workhorse class is the SearchServiceUmo. It takes in a Map of request parameters representing a query from a remote client, and executes a Lucene search against a local index. It then returns a List of result beans (a POJO, shown below), converted into an XML stream. One important thing to note is that there is no mention of any Mule API or classes, ie, coupling between application code and Mule is only via the XML wiring.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Source: src/main/java/com/mycompany/searchservice/SearchServiceUmo.java
package com.mycompany.searchservice;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CachingWrapperFilter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.BooleanClause.Occur;

import com.thoughtworks.xstream.XStream;

/**
 * User defined search service.
 */
public class SearchServiceUmo {

  private final Log log = LogFactory.getLog(getClass());

  private static final Analyzer ANALYZER = new StandardAnalyzer();
  
  private List<String> indexPaths;
  private Searcher searcher;
  
  public void setIndexPaths(List<String> indexPaths) {
    this.indexPaths = indexPaths;
  }
  
  protected void init() throws Exception {
    if (indexPaths.size() == 0) {
      throw new IllegalArgumentException(
        "At least one index must be specified");
    } else if (indexPaths.size() == 1) {
      this.searcher = new IndexSearcher(indexPaths.get(0));
    } else {
      Searcher[] searchers = new Searcher[indexPaths.size()];
      for (int i = 0; i < searchers.length; i++) {
        searchers[i] = new IndexSearcher(indexPaths.get(i));
      }
      this.searcher = new MultiSearcher(searchers);
    }
  }
  
  protected void destroy() {
    if (searcher != null) {
      try {
        searcher.close();
        searcher = null;
      } catch (Exception e) {
        log.warn("Searcher at " + indexPaths + 
          " could not be closed", e);
      }
    }
  }

  /**
   * For synchronous services, there does not seem to be a way to 
   * apply a transformation on the results returned from a component,
   * so we are doing this in code...its probably not that big a deal, 
   * since its only 2 lines of code, but it would be nice if we could 
   * do this using an available Mule component (ObjectToXml is available,
   * but cannot be used without complex shenanigans, as far as I can see).
   * @param params the request parameters as a Map of name-value pairs.
   * @return the response XML string.
   * @throws Exception if thrown.
   */
  public String search(Map<String,Object> params) throws Exception {
    List<SearchResultBean> beans = searchInternal(params);
    String result = new XStream().toXML(beans);
    return result;
  }
  
  private List<SearchResultBean> searchInternal
      (Map<String,Object> params) throws Exception {
    // we could probably write a custom transformer here to get a 
    // parameter object as our argument, which would allow for 
    // multiple params with the same name, and other good stuff, 
    // but we are lazy, so...
    if (params.containsKey("reopen")) {
      // this is for the batch update script
      destroy();
      init();
      return Collections.emptyList();
    } else {
      Query query = buildQuery((String) params.get("query"));
      Filter filter = buildFilter((String) params.get("filter"));
      Sort sort = buildSort((String) params.get("sort"));
      int startIndex = Integer.valueOf((String) params.get("start"));
      int endIndex = Integer.valueOf((String) params.get("end"));
      TopDocs td = searcher.search(query, filter, endIndex, sort);
      ScoreDoc[] sds = td.scoreDocs;
      List<SearchResultBean> results = 
        new ArrayList<SearchResultBean>();
      for (int i = startIndex; i < endIndex; i++) {
        Document doc = searcher.doc(sds[i].doc);
        results.add(new SearchResultBean(doc, sds[i].score));
      }
      return results;
    }
  }

  /**
   * Build up the query object using some standard rules. In this case,
   * our rules are (body:${q}) OR (title:${q})^4.0. We used Standard
   * Analyzer to tokenize both body and title at index time, so we must 
   * also use it for query building.
   * @param q the query term.
   * @return the Query object.
   */
  private Query buildQuery(String q) throws Exception {
    BooleanQuery query = new BooleanQuery();
    Query titleQuery = new QueryParser("title", ANALYZER).parse(q);
    titleQuery.setBoost(4.0F);
    query.add(titleQuery, Occur.SHOULD);
    Query bodyQuery = new QueryParser("body", ANALYZER).parse(q);
    query.add(bodyQuery, Occur.SHOULD);
    return query;
  }
  
  /**
   * Some filtering criteria. In our case, we know that our tags contain
   * our filtering criteria, so we use that. The parameter is a comma-
   * separated list of tags. The tags are indexed without tokenizing, so
   * we use a plain TermQuery here.
   * @param tags the tags to filter on.
   * @return a Filter object.
   */
  private Filter buildFilter(String tags) {
    if (StringUtils.isEmpty(tags)) {
      return null;
    }
    BooleanQuery query = new BooleanQuery();
    String[] tagArray = StringUtils.split(tags, ",");
    for (int i = 0; i < tagArray.length; i++) {
      TermQuery tquery = new TermQuery(new Term("tag", tagArray[i]));
      query.add(tquery, Occur.MUST);
    }
    return new CachingWrapperFilter(new QueryWrapperFilter(query));
  }

  /**
   * We always sort by the natural order of the sort fields specified.
   * If a field is prefixed with a '-', then we reverse the natural
   * sort order for that field. 
   * @param sortFields a comma-separated list of sort fields to sort by.
   * @return a Sort object for this search.
   */
  private Sort buildSort(String sortFields) {
    if (StringUtils.isEmpty(sortFields)) {
      return Sort.RELEVANCE;
    }
    String[] sortFieldArray = StringUtils.split(sortFields, ",");
    SortField[] sfs = new SortField[sortFieldArray.length];
    for (int i = 0; i < sortFieldArray.length; i++) {
      if (sortFieldArray[i].startsWith("-")) {
        sfs[i] = new SortField(sortFieldArray[i].substring(1), true);
      } else {
        sfs[i] = new SortField(sortFieldArray[i]);
      }
    }
    return new Sort(sfs);
  }
}

The SearchResultBean is a POJO. I have removed the getters and setters to keep the code short. Use your IDE to generate them. Note that if you want to deserialize the XML back into this bean on the client side, the bean must exist on the client's CLASSPATH as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Source: src/main/java/com/mycompany/searchservice/SearchResultBean.java
package com.mycompany.searchservice;

import java.io.Serializable;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.lucene.document.Document;

/**
 * Simple POJO to hold the contents of a search result. This should 
 * be available on the client side as well, in order for XStream to 
 * be able to deserialize this into a SearchResultBean.
 */
public class SearchResultBean implements Serializable {
  
  private static final long serialVersionUID = -2701792004759978895L;
  
  private String id;
  private String title;
  private String summary;
  private String[] tags;
  private String url;
  private float score;
  
  public SearchResultBean(Document doc, float score) {
    this.id = doc.get("id");
    this.title = doc.get("title");
    this.summary = doc.get("summary");
    this.tags = doc.getValues("tags");
    this.url = doc.get("url");
    this.score = score;
  }

  // ... getters and setters removed for brevity

  @Override
  public String toString() {
    return ReflectionToStringBuilder.toString(this);
  }
}

The Main class is not needed if you are using a Mule installation. In my case, this allows me to startup the Mule service from within my IDE, and is adapted from the template in the archetype.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Source: src/main/java/com/mycompany/searchservice/Main.java
package com.mycompany.searchservice;

import org.apache.log4j.BasicConfigurator;
import org.mule.api.MuleContext;
import org.mule.api.MuleException;
import org.mule.api.config.ConfigurationBuilder;
import org.mule.api.config.ConfigurationException;
import org.mule.api.context.MuleContextFactory;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextFactory;

/**
 * Launcher for the Mule based search service.
 */
public class Main {

  public static void main(String[] args) {
    BasicConfigurator.configure();
    MuleContext context = null;
    String[] resources = {"mule-config-spring.xml"};
    try {
      MuleContextFactory factory = new DefaultMuleContextFactory();
      ConfigurationBuilder builder = 
        new SpringXmlConfigurationBuilder(resources);
      context = factory.createMuleContext(builder);
      context.start();
      System.out.println("Starting Mule Instance");
    } catch (ConfigurationException e) {
      e.printStackTrace();
    } catch (InitialisationException e) {
      e.printStackTrace();
    } catch (MuleException e) {
      e.printStackTrace();
    }
  }
}

The Obligatory Screenshot

...just to show you how it all works. Typing in the URL:

1
http://localhost:8888/search?query=maven&start=0&end=10

Returns a screenful of XML as shown below:

Client code could call this using a simple HTTP Client, which would deserialize the XML (perhaps using XStream) back into a List of SearchResultBean objects, and use it as required in the application. In the case of a more Mule aware organization, the client would probably also be a Mule service.

Conclusion

As you can see, Mule provides a lot of components and XML wiring features that make it easy for the application developer to concentrate on the business logic and leave the integration details to Mule. However, while I was building this application, I realized that it would be more pragmatic (and easier) to just build a simple web application wrapper.

Obviously, this is not a reflection on the quality of Mule software. In a shop that is using Mule more heavily, this would probably be an ideal approach. However, ingesting the Mule elephant (sorry, mixing metaphors here) to just take advantage of its HTTP connector and a couple of transformers seems like a bit of overkill. Developers here are very familiar with Spring and Lucene, so building a simple web application is far simpler than learning the Mule architecture and all its components.

Another thing I noticed is that there seems to be more emphasis on asynchronous messaging in Mule, and perhaps rightly so. In my case, I would have liked to be able to wire up a transformer after my component runs, perhaps in an outbound component, but since my service is synchronous, I can only configure my inbound endpoint, which does not allow transformers after the component is run. I ultimately ended up putting the post-transformation in the service code itself. Of course, Mule is a work in progress, so I am sure the functionality will show up in a later version if it doesn't exist already. If you know how to achieve this, please let me know.

References

I found the following sites helpful during my development, if you want to try out something along similar lines, you will probably find them helpful too.

  • Maven Archetype for Mule Projects from the Morning Java blog. The archetype provides a simple example of a Mule service which is very helpful. It is based on Mule 2.0.0-RC2, but a simple version change in the POM got me set up with the current Mule version (2.2.1 at the time of this writing). In addition, I had to add in the dependency to Lucene and the Mule HTTP Connector (see the POM snippets above).
  • Mule Instance Configuration from the Mule documentation.

  • This page provides some information about modeling synchronous request-response style messaging in Mule.
  • A very informative article from InfoQ, written by Jackie Wheeler.
  • This discussion thread provided me with insight about how to handle HTTP connectors in the application.
  • I've been meaning to look at Solr for a while now, and I finally did it before starting on this application, to see if I could use Solr. The Solr Getting Started Guide was very helpful to set up a simple Solr instance which I could experiment with while going through the Solr code and documentation.

Wednesday, July 09, 2008

Yahoo WebSearch API Javascript client using Dojo

In my last post, I described a Javascript client to display results from Google's JSON search service. In that, I used a PHP proxy to get around Javascript's Same Origin Policy. A cleaner remoting architecture called JSONP or Padded JSON, proposed by Bob Ippolito, and supported by most JSON web services, relies on the server being able to emit a JSON response wrapped in a client specified callback function.

To request padded JSON, the client would populate an optional query parameter which would contain the Javascript callback function name. The client code would implement the callback function. The implementation would typically parse the JSON response and construct HTML to populate into the innerHTML element of a div tag on the page displayed on the browser.

So when the query is sent to the server, the JSON response is wrapped inside the specified callback function name. For example, a query to the Yahoo WebSearchService API would look something like this:

1
2
3
http://search.yahooapis.com/WebSearchService/webSearch?query=foo&\
  callback=handleResponse&\
  appid=get-your-own-yahoo-id-and-stick-it-in-here

And the server will return a JSON response wrapped within the callback, which is executed as a Javascript function call.

1
  handleResponse(json_response_string);

So now if we defined a function handleResponse(String), then whatever is in the function will be executed.

I think this approach is quite beautiful (in the Beautiful Code sense) - not only does it exploit the macro expansion feature in interpreted languages in a clever yet intuitive way, it enables true serverless operation by getting around Javascript's Same Origin Policy.

Setting up the client to do dynamic calls is a bit of a pain with this approach though. Since we don't know the search term until its entered, so using plain Javascript involves manipulating the DOM tree to insert the call into a html/head/script element. However, there are a lot of Javascript frameworks around which make light of this work. One such framework is Dojo, which comes with both JSON and UI components.

In this post, I describe a client that I built using Dojo to run against Yahoo's WebSearch API to display search results from my blog. Dojo has a fairly steep learning curve, but it is very well-documented, and the resulting code is very easy to read and maintain. Here is the code (really an HTML page containing Javascript code):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
  <head>
    <title>My Blog Search Widget</title>
    <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
    <style type="text/css">
      @import http://o.aolcdn.com/dojo/1.0.0/dojo/resources/dojo.css;
      @import http://o.aolcdn.com/dojo/1.0.0/dijit/themes/tundra/tundra.css;
    </style>
    <script type="text/javascript" 
      src="http://o.aolcdn.com/dojo/1.0.0/dojo/dojo.xd.js" 
      djConfig="parseOnLoad: true"></script>
    <script type="text/javascript">
      dojo.require("dijit.form.Button");
      dojo.require("dojo.io.script");
    </script>
    <script type="text/javascript">
function handleResponse(data, ioArgs) {
  var html = '<b>Results ' +
    data.ResultSet.firstResultPosition + 
    '-' +
    data.ResultSet.totalResultsReturned +
    ' for term ' +
    dojo.byId('q').value + 
    ' of about ' +
    data.ResultSet.totalResultsAvailable +
    '</b><br/><br/>';
  dojo.forEach(data.ResultSet.Result, function(result) {
    html += '<b><a href=\"' + 
      result.Url + 
      '">' +
      result.Title + 
      '</a></b><br/>' +
      result.Summary + 
      '<br/><b>' +
      result.DisplayUrl +
      '</b><br/><br/>';
  }); 
  dojo.byId("results").innerHTML = html;
}
    </script>
  </head>
  <body class="tundra">
    <p>
    <b>Enter your query:</b>
    <input type="text" id="q" name="q"/>
    <button dojoType="dijit.form.Button" id="searchButton">Search!
      <script type="dojo/method" event="onClick">
        dojo.io.script.get({
          url: 'http://search.yahooapis.com/WebSearchService/V1/webSearch',
          content: {
            appid: 'get-your-own-appid-and-stick-it-in-here',
            query: dojo.byId('q').value,
            site: 'sujitpal.blogspot.com',
            output: 'json',
            callback: 'handleResponse'
          },
          callbackParamName: handleResponse
        });
      </script>
    </button>
    </p>
    <hr/>
    <div id="results"></div>
  </body>
</html>

And here is the obligatory screenshot: