This post is not about SolrCloud. SolrCloud is going to be available in the upcoming Solr 4.x release, and renders a lot of the work described in this blog post obsolete. However, I am working with the latest released Solr version (3.5), and I needed to have a way to have Nutch index its contents onto a bank of Solr server shards, which I could then use to run distributed queries against.
Indexing
Distributed indexing can be achieved quite simply with Nutch by making some fairly minor changes to the SolrWriter and SolrIndexerReducer (in the NutchGora branch, I haven't looked at the trunk, so can't comment).
From the user-interface point of view, you specify a comma-separated list of Solr server URLs instead of a single one in the solrindexer job. Under the covers, the job starts up a list of Solr servers, each with its own document queue. A partitioner checks which server a document will go to based on its key. Each time an input queue becomes larger than a specified size (the commit interval), a commit is called on the appropriate Solr server. Once all the URLs are consumed, a commit is called on all the Solr servers in the list.
You can find my patch (for NutchGora branch only) in NUTCH-945. The discussion that led to this change can be found here.
I also put in the same change to my custom sub-page indexer (originally described here). The changes are only in the reducer, so I have removed the mapper code for brevity. You can get the mapper code from the previous post from the link referenced in this paragraph.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | // Source: src/java/com/mycompany/nutch/subpageindexer/SolrSubpageIndexerJob.java
package com.mycompany.nutch.subpageindexer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.avro.util.Utf8;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.indexer.IndexerJob;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.solr.NonPartitioningPartitioner;
import org.apache.nutch.indexer.solr.SolrConstants;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.ToolUtil;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.DateUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
public class SolrSubpageIndexerJob extends IndexerJob {
private static Log LOG = LogFactory.getLog(SolrSubpageIndexerJob.class);
private static final Collection<WebPage.Field> FIELDS =
new HashSet<WebPage.Field>();
static {
FIELDS.addAll(Arrays.asList(WebPage.Field.values()));
}
public static class SolrSubpageIndexerJobMapper
extends GoraMapper<String,WebPage,Text,NutchDocument> {
// ... no changes here ...
}
public static class SolrSubpageIndexerJobReducer
extends Reducer<Text,NutchDocument,Text,NutchDocument> {
private int commitSize;
private SolrServer[] servers;
private Partitioner<String,NutchDocument> partitioner;
private List<SolrInputDocument>[] sdocs = null;
@SuppressWarnings("unchecked")
@Override
public void setup(Context ctx) throws IOException {
Configuration conf = ctx.getConfiguration();
String[] urls = conf.getStrings(SolrConstants.SERVER_URL);
if (urls.length == 0) {
throw new IOException(SolrConstants.SERVER_URL + " not configured");
}
this.servers = new SolrServer[urls.length];
this.sdocs = (ArrayList<SolrInputDocument>[])
new ArrayList[urls.length];
for (int i = 0; i < urls.length; i++) {
servers[i] = new CommonsHttpSolrServer(urls[i]);
sdocs[i] = new ArrayList<SolrInputDocument>();
}
commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
if (urls.length == 1) {
partitioner = new NonPartitioningPartitioner();
} else {
try {
String partitionerClass = conf.get(SolrConstants.PARTITIONER_CLASS);
partitioner = (Partitioner<String,NutchDocument>)
Class.forName(partitionerClass).newInstance();
LOG.info("Partitioning using: " + partitionerClass);
} catch (Exception e) {
partitioner = new HashPartitioner<String, NutchDocument>();
LOG.info("Partitioning using default HashMod partitioner");
}
}
this.commitSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
}
@Override
public void reduce(Text key, Iterable<NutchDocument> values,
Context ctx) throws IOException, InterruptedException {
for (NutchDocument doc : values) {
SolrInputDocument sdoc = new SolrInputDocument();
for (String fieldname : doc.getFieldNames()) {
sdoc.addField(fieldname, doc.getFieldValue(fieldname));
}
int partition = partitioner.getPartition(
key.toString(), doc, sdocs.length);
sdocs[partition].add(sdoc);
if (sdocs[partition].size() >= commitSize) {
try {
servers[partition].add(sdocs[partition]);
} catch (SolrServerException e) {
throw new IOException(e);
}
sdocs[partition].clear();
}
}
}
@Override
public void cleanup(Context ctx) throws IOException {
for (int i = 0; i < sdocs.length; i++) {
try {
if (sdocs[i].size() > 0) {
servers[i].add(sdocs[i]);
}
sdocs[i].clear();
servers[i].commit();
} catch (SolrServerException e) {
throw new IOException(e);
}
}
}
}
@Override
public Map<String,Object> run(Map<String,Object> args) throws Exception {
String solrUrl = (String) args.get(SolrConstants.SERVER_URL);
if (StringUtils.isNotEmpty(solrUrl)) {
getConf().set(SolrConstants.SERVER_URL, solrUrl);
}
String batchId = (String) args.get(Nutch.ARG_BATCH);
if (StringUtils.isNotEmpty(batchId)) {
getConf().set(Nutch.ARG_BATCH, batchId);
}
currentJob = new NutchJob(getConf(), "solr-subpage-index");
StorageUtils.initMapperJob(currentJob, FIELDS, Text.class,
NutchDocument.class, SolrSubpageIndexerJobMapper.class);
currentJob.setMapOutputKeyClass(Text.class);
currentJob.setMapOutputValueClass(NutchDocument.class);
currentJob.setReducerClass(SolrSubpageIndexerJobReducer.class);
currentJob.setNumReduceTasks(5);
currentJob.waitForCompletion(true);
ToolUtil.recordJobStatus(null, currentJob, results);
return results;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: SolrSubpageIndexerJob <solr url> (<batch_id> | -all)");
return -1;
}
LOG.info("SolrSubpageIndexerJob: starting");
run(ToolUtil.toArgMap(
SolrConstants.SERVER_URL, args[0],
Nutch.ARG_BATCH, args[1]));
LOG.info("SolrSubpageIndexerJob: success");
return 0;
}
public static void main(String[] args) throws Exception {
final int res = ToolRunner.run(NutchConfiguration.create(),
new SolrSubpageIndexerJob(), args);
System.exit(res);
}
}
|
If you want to specify your own custom partitioner, then you will need to define it in your nutch-site.xml file. Here is an example from mine:
1 2 3 4 5 | <property>
<name>solr.partitioner.class</name>
<value>com.mycompany.nutch.indexer.solr.MurmurHashPartitioner</value>
<description>Custom partitioner for distributed Solr index</description>
</property>
|
I set up clones of Solr by copying my non-distributed Solr server bin distribution directory (with the nutch version of schema.xml, updated as described in previous posts), deleting the contents of the data directory, and running each on their own ports, like so:
1 2 3 4 5 6 7 | sujit@cyclone:NutchGora$ # On one terminal
sujit@cyclone:NutchGora$ cd solr1/example
sujit@cyclone:example$ java -Djetty.port=8984 -jar start.jar
sujit@cyclone:example$
sujit@cyclone:NutchGora$ # On another terminal
sujit@cyclone:NutchGora$ cd solr2/example
sujit@cyclone:example$ java -Djetty.port=8985 -jar start.jar
|
Once you apply the batch and build the runtime, you can run the solrindexer and SolrSubpageIndexer jobs from the command line like so:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | sujit@cyclone:local$ # indexing single-solr mode to port 8983
sujit@cyclone:local$ bin/nutch solrindex http://localhost:8983/solr -all
sujit@cyclone:local$
sujit@cyclone:local$ # indexing distributed-solr mode to ports 8984/8985
sujit@cyclone:local$ bin/nutch solrindex
http://localhost:8984/solr,http://localhost:8985/solr -all
sujit@cyclone:local$
sujit@cyclone:local$ # indexing subpages single-solr mode to port 8983
sujit@cyclone:local$ bin/nutch \
com.mycompany.nutch.subpageindexer.SolrSubpageIndexerJob \
http://localhost:8983/solr -all
sujit@cyclone:local$
sujit@cyclone:local$ # indexing subpages distrib-solr mode to ports 8984/8985
sujit@cyclone:local$ bin/nutch \
com.mycompany.nutch.subpageindexer.SolrSubpageIndexerJob \
http://localhost:8984/solr,http://localhost:8985/solr -all
|
I have intentionally shown the commands for the single-solr indexing version to illustrate that the change is fully backward compatible, and also because I wanted to compare the search results between the non-distributed (port 8983) and distributed (ports 8984 and 8985) environments.
Search
Solr (version 3.5) which I am using supports distributed search via sharding out of the box. The Solr Distributed Search wiki page has more information about it. But to enable distributed search on a query (provided its handler is not using any of the unsupported components), is as easy as adding a shards parameter to your URL, which contains a comma-separated list of Solr servers. In my setup, my shards parameter would look like shards=localhost:8984/solr,localhost:8985/solr.
To support sharding in my Python client (described here), all I needed to do was declare my SOLR_SHARDS value and add the shards to my solrparams tuple list (around line 141), so its passed back to Solr. Also since I am pointing to (8984,8985) now, my query has to hit one of these servers instead of 8983 (hardcoded in SOLR_SERVER) so that should be changed too.
1 2 3 4 5 | SOLR_SERVER = "http://localhost:8984/solr/select"
SOLR_SHARDS = ["localhost:8984/solr", "localhost:8985/solr"]
...
# finally, add the shards parameters
solrparams.append(tuple(["shards", ",".join(SOLR_SHARDS)]))
|
I set up two copies of my CherryPy based client applications, one running against the single-Solr instance on port 8983 and listening on port 8081, and another one running against the distributed Solr instances on ports 8984 and 8985 and listening on port 8082, and compared results from sending the same query to both applications. Below are some screenshots - as you can see, results are identical (which is expected, of course).
From what I see from the logs, the response handler that is invoked with the sharded query (/select on port 8984 in our case), intercepts the shards parameter and forwards the query to each shard, with the shards parameter replaced with isShard=true. Once it gets back all the responses, it joins them back and presents it back to the caller.
This search interface is provided for Apache Velocity project?
ReplyDeleteI'm work in the aplication and I go need samething like this.
Hi Adriano, do you mean the screenshots of the search results in this post? If so, no, I wrote my own python/cherrypy code to do this.
ReplyDeleteHi Sujit, Distributed search using shardhandler is not working with https. Please let me know if i miss anything in this configuration
ReplyDeleteexplicit
xml
true
*:*
id, solr.title, content, category, link, pubdateiso
localhost:7443/solr/ProfilesJava/|localhost:7443/solr/C3Files/|localhost:7443/solr/Blogs/|localhost:7443/solr/Communities/|localhost:7443/solr/Wikis/|localhost:7443/solr/Bedeworks/|localhost:7443/solr/Forums/|localhost:7443/solr/Web/|localhost:7443/solr/Bookmarks/
https://
1000
5000
with this configuration i am getting no live solr servers
when i replace "|" with "," between distributed searches then i am getting org.apache.solr.client.solrj.SolrServerException: IOException occured when talking to server at: https://localhost:7443/solr/ProfilesJava
In the code you are trying to connect using SolrJ using CommonsHttpServer - this assumes that the Solr is running in a container (Tomcat, Jetty, etc) that serves HTTP. Since your Solr requires HTTPS access, you will probably need to subclass your CommonsHttpServer class to work with HTTPS - I am not sure how hard its going to be though. Unless there are some overriding business reasons to put Solr under HTTPS, it may be easier to install Solr in a container that allows HTTP access.
ReplyDeleteThanks Sujit. I found the solution in my java certs it does`nt have Root CA inform. Once i imported that it is fine now..
ReplyDelete