Saturday, September 10, 2011

Multithreaded Proxy SOLR Search Handler

As a Lucene shop trying to migrate our somewhat unique search algorithms to SOLR, most of my experience has been on building custom handlers that wrap our algorithms so they can be served by SOLR. However, there are times when you can use SOLR's default handler with minor customizations.

One such opportunity presented itself when building the SOLR backend to a topic page. A topic page is basically a link farm of links that are contextually relevant to the topic (a disease or drug), classified into multiple facets. For example, a disease topic page could have links relevant to treatment, medication, symptoms, etc.

To power such a page, a set of queries are fired off against the SOLR server, each group (treatment, medication, symptom in the above example) corresponding to a single query. Generally the application would use some AJAX/Javascript magic to fire these queries off in parallel and have the individual parts of the page populate as the results became available.

In my case, I needed to provide a single call to the application that would get all the results at once (I guess they had enough AJAX/Javascript magic to deal with :-)). So I figured that it would be just as simple to move the multi-threading into the server. So basically I built a proxy search handler that took the topic name as its query, created different subqueries out of it for each group, and fired them off against itself in parallel using SolrJ. Once all the subqueries had returned results, the response is populated with a map of SolrDocumentList objects keyed by subquery name.

Its probably not a hugely unique idea or anything, but I have never read about anybody doing something like this, so I decided to put it out there, just in case someone found it useful. So anyway, here is how you configure it in solrconfig.xml:

1
2
3
4
5
6
7
8
9
  ...
  <requestHandler name="/mt" 
      class="com.mycompany.solr.handler.MTProxyRequestHandler">
    <lst name="defaults">
      <str name="echoParams">explicit</str>
      <str name="wt">xml</str>
    </lst>
  </requestHandler>
  ...

And here is what the code looks like:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// $Id: TopicRequestHandler.java,v 1.5 2011/09/06 21:49:01 spal Exp $
// $Source: /export/cvsrepository/mycompany/hl-solr/src/main/java/com/mycompany/solr/handler/TopicRequestHandler.java,v $
package com.mycompany.solr.handler;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QParserPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MTProxyRequestHandler extends SearchHandler {

  public static final String SUBQUERY_RESULTS_COMPONENT_NAME = 
    "subquery-results";
  
  private final Logger logger = LoggerFactory.getLogger(getClass());
  
  private CommonsHttpSolrServer solrServer;
  
  @SuppressWarnings("unchecked")
  @Override
  public void init(NamedList args) {
    super.init(args);
    try {
      this.solrServer = new CommonsHttpSolrServer("http://localhost:8080/solr");
      this.solrServer.setRequestWriter(new BinaryRequestWriter());
      this.solrServer.setParser(new BinaryResponseParser());
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void inform(SolrCore core) {
    super.inform(core);
  }

  @Override
  public void handleRequestBody(SolrQueryRequest req, 
      SolrQueryResponse rsp) throws Exception {
    // extract parameters
    final SolrParams params = req.getParams();
    final String q = params.get(CommonParams.Q);
    final Map<String,List<SolrDocument>> facetResults = 
      new ConcurrentHashMap<String,List<SolrDocument>>();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (final SubQuery sq : SubQuery.values()) {
      final Query subquery = sq.qparserPlugin.createParser(
        q, null, params, req).getQuery();
      executorService.submit(new Runnable() {
        @Override
        public void run() {
          try {
            // copy parameters from incoming request
            Map<String,String> paramMap = new HashMap<String,String>();
            for (Iterator<String> it = params.getParameterNamesIterator();
                it.hasNext(); ) {
              String key = it.next();
              paramMap.put(key, params.get(key));
            }
            // override the query string
            paramMap.put(CommonParams.Q, subquery.toString());
            SolrParams sqParams = new MapSolrParams(paramMap);
            QueryResponse proxyRsp = solrServer.query(sqParams, METHOD.POST);
            SolrDocumentList sdl = proxyRsp.getResults();
            facetResults.put(sq.name(), sdl);
          } catch (SolrServerException e) {
            logger.warn("Could not execute sub-query " + sq.name() + 
              " for :" + q, e);
          }
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    rsp.add(SUBQUERY_RESULTS_COMPONENT_NAME, facetResults);
  }
  
  // Set up the different QParserPlugin implementations, one for
  // each sub-query that will be sent in parallel.
  
  private enum SubQuery {
    sq1 (new MTProxyRequestHandler().new QParserPlugin1()),
    sq2 (new MTProxyRequestHandler().new QParserPlugin2());
    
    public QParserPlugin qparserPlugin;
    
    SubQuery(QParserPlugin qparserPlugin) {
      this.qparserPlugin = qparserPlugin;
    }
  };
  
  // each sub-query has a QParserPlugin and associated QParser
  // which generates the actual Query from the query string
  
  // Subquery 1
  private class QParserPlugin1 extends QParserPlugin {

    @Override
    public QParser createParser(String qstr, SolrParams localParams, 
        SolrParams params,
        SolrQueryRequest req) {
      return new QParser1(qstr, localParams, params, req);
    }

    @Override
    public void init(NamedList args) { /* NOOP */ }
  }

  private class QParser1 extends QParser {

    public QParser1(String qstr, SolrParams localParams, 
        SolrParams params, SolrQueryRequest req) {
      super(qstr, localParams, params, req);
    }

    @Override
    public Query parse() throws ParseException {
      return new TermQuery(new Term("f1", qstr));
    }
  }

  // Subquery 2
  private class QParserPlugin2 extends QParserPlugin {

    @Override
    public QParser createParser(String qstr, SolrParams localParams, 
        SolrParams params,
        SolrQueryRequest req) {
      return new QParser2(qstr, localParams, params, req);
    }

    @Override
    public void init(NamedList args) { /* NOOP */ }
  }
  
  private class QParser2 extends QParser {

    public QParser2(String qstr, SolrParams localParams, 
        SolrParams params, SolrQueryRequest req) {
      super(qstr, localParams, params, req);
    }

    @Override
    public Query parse() throws ParseException {
      return new TermQuery(new Term("f2", qstr));
    }
  }

  //////////////////////// SolrInfoMBeans methods //////////////////////

  @Override
  public String getDescription() {
    return "MT Proxy Handler";
  }

  @Override
  public String getSource() {
    return "$Source$";
  }

  @Override
  public String getSourceId() {
    return "$Id$";
  }

  @Override
  public String getVersion() {
    return "$Revision$";
  }
}

As you can see, its fairly straightforward. The different types of subqueries which need to be executed are enumerated by name (and associated QParserPlugin implementation) in the SubQuery enum. The input query string is passed to each QParser named in the corresponding QParserPlugin, which creates the subquery. Each subquery is executed in parallel against the local SOLR server using SolrJ. Once all the calls return, they are packaged up into the response. This structure (as opposed to a custom structure) is also easy for the client to parse, since its basically just a map of SolrDocumentList objects keyed by the subquery (group) name.

The QParsers here just create TermQueries against different fields in the index - I have kept it simple for ease of understanding. In reality the code does pretty hairy things to the input query string :-).

Regarding performance, my actual code gets results for 10 groups, and so far (on my dev indexes) performance is quite good. In any case, because the set of topics is finite, it is possible to autowarm these queries at startup.

In terms of portability and compliance, it would have probably been better if I had declared the QParserPlugin implementations in the solrconfig.xml file as shown in the SolrPlugins wiki page. However, these custom QParserPlugins are unlikely to be used outside the code, so I decided to just model them as inner classes.

4 comments:

  1. Hi Sujit,

    Nice blog! Is there an email address I can contact you in private?

    ReplyDelete
  2. Thanks Ilias, I would rather not post my email address publicly. So if you send me a comment with your email address and a note not to publish it, I can send you an email on that address and delete that posting.

    ReplyDelete
  3. If you're already inside the solr execution context why would you connect over HTTP? Call the search methods directly.

    ReplyDelete
  4. Yes, its probably better to do that, but I was after as much code reuse as possible, so just used SolrJ to send queries to other (different) handlers and collect the results in the proxy handler described.

    ReplyDelete

Comments are moderated to prevent spam.