Background
Delta, or incremental indexing, is pretty much the main reason I am so excited about Nutch/GORA. Replacing segment files with database opens up the possibility of external programs modifying metadata in the database, and thereby modifying the pipeline's behavior to support delta indexing.
For web crawling (Nutch's default use case), the Adaptive Fetch Schedule is an excellent choice. This component (set by db.fetch.schedule.class) will decrease or increase the fetch interval based on whether the page has changed or not. However, if you wanted to use Nutch as an indexing pipeline for a Content Management System (CMS) for example, then you will need to offer the CMS folks a slightly more deterministic way to control what appears in the index.
When a CMS publishes a piece of content, the content should (within some predictable time interval) make it into the index (so it starts appearing in search results). On the other hand, when a CMS unpublishes a piece of content, it should disappear from the index. In this post, I propose an approach that does this, using Nutch/GORA (with custom plugins) and some supporting infrastructure.
Infrastructure
I have previously described using a HTTP server to stage content in a local filesystem to Nutch. I use a similar strategy here, except that my seed file for a given provider (or CMS) is now a dynamically generated "index" file, also served by the HTTP server, that lists all the content available from the provider. Something like this:
In line with this, my script that launches a CherryPy HTTP server now has an additional handler to dynamically generate the index page by recursively scanning the directory (/provider_index), as well as one that serves a named file from disk (/provider). Heres the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | #!/usr/bin/python
import cherrypy
import os
import os.path
import urllib
from cherrypy.lib.static import serve_file
SITES_DIR = "/path/to/your/sites"
SERVER_HOST = "localhost"
SERVER_PORT = 8080
def _accumulate_files(files, dirname, fnames):
"""
This function gets called for every file (and directory) that is walked
by os.path.walk. It accumulates the file names found into a flat array.
The file names accumulated are relative to the providers directory.
"""
for fname in fnames:
abspath = os.path.join(dirname, fname)
if os.path.isfile(abspath):
abspath = abspath.replace(os.path.join(SITES_DIR, "providers"), "")[1:]
files.append(abspath)
class Root:
@cherrypy.expose
def test(self, name):
"""
Expose the mock site for testing.
"""
return serve_file(os.path.join(SITES_DIR, "test", "%s.html" % (name)), \
content_type="text/html")
@cherrypy.expose
def provider_index(self, name):
"""
Builds an index page of links to all the files for the specified
provider. The files are stored under sites/providers/$name. The
function will recursively walk the filesystem under this directory
and dynamically generate a flat list of links. Path separators in
the filename are converted to "__" in the URL. The index page can
be used as the seed URL for this content.
"""
files = []
os.path.walk(os.path.join(SITES_DIR, "providers", name), \
_accumulate_files, files)
index = "<html><head></head><body><ul>"
for file in files:
url = "http://%s:%s/provider/%s" % (SERVER_HOST, SERVER_PORT, \
urllib.quote_plus(file.replace(os.path.sep, "__")))
index += """
<li><a href="%s">%s</a></li>
""" % (url, url)
index += "</ul></body></html>"
return [index]
@cherrypy.expose
def provider(self, name):
"""
Returns the contents of the XML file stored at the location
corresponding to the URL provided. The "__" in the URL are converted
back to file path separators.
"""
ct = None
if name.endswith(".xml"):
ct = "application/xml"
elif name.endswith(".json"):
ct = "application/json"
if ct is None:
return serve_file(os.path.join(SITES_DIR, "providers", \
"%s" % name.replace("__", os.path.sep)), \
content_type = "text/html")
else:
return serve_file(os.path.join(SITES_DIR, "providers", \
"%s" % (urllib.unquote_plus(name).replace("__", os.path.sep))), \
content_type = ct)
if __name__ == '__main__':
current_dir = os.path.dirname(os.path.abspath(__file__))
# Set up site-wide config first so we get a log if errors occur.
cherrypy.config.update({'environment': 'production',
'log.access_file': 'site.log',
'log.screen': True,
"server.socket_host" : SERVER_HOST,
"server.socket_port" : SERVER_PORT})
cherrypy.quickstart(Root(), '/')
|
My seed file now lists only the URL to the index file, as shown below:
1 | http://localhost:8080/provider_index/prov1 u_idx=prov1
|
Delta Indexing Overview
As I mentioned earlier, the CMS can send either a publish or an unpublish request. A publish request can translate to an Add (if its a new page that doesn't exist in the database) or an Update. An unpublish request would translate to a Delete (or Hide) request at the database level.
For an Add request, we reset the fetch time of the provider page to the current time minus the fetch interval, making it eligible for a fetch in the next crawl run. Since it is dynamically generated off the file system, the added file is guaranteed to be in this file (since it exists in the filesystem). In the first iteration of the recrawl, the index page will be refetched and reparsed, and the second iteration will generate the newly added page(s) from the index page outlinks to the fetch list, and then to the index.
Of course, the index page itself should not make it into the index, so we have a custom Index Filter (described later) to filter these out by pattern.
Here is an alternative approach which relies on accurate Last-Modified headers and a more aggressive crawl schedule. With this approach, you don't need an index file or any custom handling for Adds and Updates, but you do need to crawl more frequently. I prefer an event based approach, which is what I have here.
For an Update request, we reset the fetch time of the page specified by the URL, similar to the index page. The first iteration of the next recrawl will pick the changes up and update it into the index.
For a Delete, we do two things - first, we delete the record from Solr. Then we mark the record deleted in Cassandra by setting the status to STATUS_GONE. We add logic to an indexing filter to filter out pages with this status from making it into the index.
Delta Indexing Tool
To do the publish and unpublish on the index, I built a tool (callable using full class name from bin/nutch). For this I built my package under nutch's src/java tree. I had to modify the nutch build.xml slightly to get my code to compile, specifically add the com/mycompany/nutch/**/*.java path to the javac includes in the "compile-core" target of nutch's build.xml.
I had initially planned on building it with GORA so it could be easily translated to other backends as well. The GORA tutorial has examples of retrieving and updating records by key. However, it turns out that the CassandraStore (the gora-cassandra implementation of the DataStore), has an empty implementation of get(String) that returns null.
So I finally settled on using the Hector API (which GORA also uses) to talk directly with the Cassandra database. I am using Hector's template API which feels similar to (and is probably inspired by) Spring's JdbcTemplate. Here is the code for this tool. Not as nice as using GORA (ie not database agnostic), but it'll have to do for now.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | // Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.template.ColumnFamilyResult;
import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate;
import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater;
import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
public class DeltaHandler {
private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
private static final String PROVIDER_INDEX_URLPREFIX_KEY =
"mycompany.provider.index.urlprefix";
private static final String SOLR_URL = "mycompany.solr.url";
private static final Utf8 U_IDX = new Utf8("u_idx");
private String providerIndexUrlPrefix;
private CommonsHttpSolrServer solrServer;
private Keyspace keyspace;
private ColumnFamilyTemplate<String,String> template;
public DeltaHandler() {
try {
init();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void init() throws Exception {
Configuration conf = new Configuration();
conf.addResource("nutch-default.xml");
conf.addResource("nutch-site.xml");
this.providerIndexUrlPrefix = conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
this.solrServer = new CommonsHttpSolrServer(conf.get(SOLR_URL));
Cluster cluster = HFactory.getOrCreateCluster("Test Cluster",
new CassandraHostConfigurator("localhost:9160"));
this.keyspace = HFactory.createKeyspace("webpage", cluster);
this.template = new ThriftColumnFamilyTemplate<String,String>(
keyspace, "f", StringSerializer.get(), StringSerializer.get()
);
}
private void destroy() {
}
public void publish(String url, String idx) throws Exception {
String key = TableUtil.reverseUrl(url);
ColumnFamilyResult<String,String> res =
template.queryColumns(key);
String bas = res.getString("bas");
if (StringUtils.isEmpty(bas)) {
// requested URL does not exist, should be an "add",
// reset fetchtime for the index page
key = TableUtil.reverseUrl(StringUtils.join(new String[] {
providerIndexUrlPrefix, idx}, "/"));
res = template.queryColumns(key);
bas = res.getString("bas");
}
if (StringUtils.isEmpty(bas)) return;
int fetchInterval = Integer.valueOf(res.getString("fi"));
// update the fetch time to current - fetchInterval so
// it is eligible for crawling immediately
ColumnFamilyUpdater<String,String> updater =
template.createUpdater(key);
updater.setString("ts", String.valueOf(
System.currentTimeMillis() - fetchInterval));
template.update(updater);
}
public void unpublish(String url, String idx, boolean commit)
throws Exception {
String key = TableUtil.reverseUrl(url);
ColumnFamilyResult<String,String> res = template.queryColumns(key);
String bas = res.getString("bas");
if (StringUtils.isNotEmpty(bas)) {
System.out.println("found it!");
ColumnFamilyUpdater<String,String> updater =
template.createUpdater(key);
updater.setString("st", String.valueOf(
CrawlStatus.STATUS_GONE));
template.update(updater);
deleteFromSolr(key, commit);
}
}
private void deleteFromSolr(String key, boolean commit)
throws Exception {
solrServer.deleteById(key);
if (commit) {
solrServer.commit();
}
}
private static void usage() {
System.out.println(
"Usage: DeltaHandler publish|unpublish url idx [commit]");
System.out.println("commit = true|false, only for unpublish");
System.exit(-1);
}
public static void main(String[] args) {
String command = null;
String url = null;
String idx = null;
Boolean commit = null;
if (args.length > 0) {
command = args[0];
if (!("publish".equals(command)) &&
!("unpublish".equals(command))) {
usage();
}
}
if ("publish".equals(command)) {
if (args.length > 2) {
url = args[1];
idx = args[2];
} else {
usage();
}
} else if ("unpublish".equals(command)) {
if (args.length > 3) {
url = args[1];
idx = args[2];
commit = Boolean.valueOf(args[3]);
} else {
usage();
}
}
DeltaHandler handler = null;
try {
handler = new DeltaHandler();
if ("publish".equals(command)) {
handler.publish(url, idx);
} else {
handler.unpublish(url, idx, commit);
}
} catch (Exception e) {
LOG.error(e, e);
} finally {
if (handler != null) {
handler.destroy();
}
}
}
}
|
This is really only for testing. For high volume use, it would be fairly simple to have this logic live behind a webservice which the CMS code could call. But in any case, here are examples of command line usage of the tool above.
1 2 3 4 5 6 7 | sujit@cyclone:local$ # publishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
publish http://localhost:8080/provider/prov1__1__000022.xml prov1
sujit@cyclone:local$ # unpublishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
unpublish http://localhost:8080/provider/prov1__1__000022.xml prov1 \
true
|
I also added a few new properties for this tool (and for the plugin decribed below). Here are the additional properties from my nutch-site.xml file.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | <property>
<name>mycompany.provider.index.urlprefix</name>
<value>http://localhost:8080/provider_index</value>
<description>The URL to the content server hosting the provider content </description>
</property>
<property>
<name>mycompany.solr.url</name>
<value>http://localhost:8983/solr</value>
<description>The SOLR URL to publish/unpublisth to</description>
</property>
<property>
<name>mycompany.provider.index.pattern</name>
<value>provider_index</value>
<description>Pattern for "meta" pages listing other local pages. This
page is needed for delta indexing to manage on-demand add/delete/update
of pages from collections which need this feature. But it should not
be indexed, so need to be removed during the indexing step.</description>
</property>
|
Delta Indexing Filter Plugin
As mentioned above, you probably don't want your index pages to make it into the index, since they are basically link farms and have no useful content (for the search user). So you want to suppress these pages from ever making it into the index.
The second class of pages would be the ones marked GONE by the unpublish command. The unpublish deletes the record from the Solr index, but you want to make sure that the page doesn't slip in on the next solrindex call. So we build an indexing filter which filters out these two category of pages.
The functionality above is built into the DeltaIndexFilter, a simple Nutch IndexFilter implementation. Here is the declaration for this filter from my plugin.xml file.
1 2 3 4 5 6 | <extension id="com.mycompany.nutch.indexer.provider"
name="Delta Indexing related Page Index Suppressor"
point="org.apache.nutch.indexer.IndexingFilter">
<implementation id="mycompany-indexer-provider"
class="com.mycompany.nutch.indexer.delta.DeltaIndexingFilter"/>
</extension>
|
and here is the code for the DeltaIndexFilter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | // Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/delta/DeltaIndexingFilter.jav
package com.mycompany.nutch.indexer.delta;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
public class DeltaIndexingFilter implements IndexingFilter {
private static final String PROVIDER_INDEX_PATTERN_KEY =
"mycompany.provider.index.pattern";
private static final Set<WebPage.Field> FIELDS =
new HashSet<WebPage.Field>();
static {
FIELDS.add(WebPage.Field.STATUS);
FIELDS.add(WebPage.Field.METADATA);
}
private Configuration conf;
private String providerIndexPattern;
@Override
public NutchDocument filter(NutchDocument doc, String url, WebPage page)
throws IndexingException {
if (StringUtils.isEmpty(providerIndexPattern)) {
return doc;
} else {
if (url.contains(providerIndexPattern) ||
CrawlStatus.STATUS_GONE == page.getStatus()) {
// do not index this page
return null;
} else {
return doc;
}
}
}
@Override
public Collection<Field> getFields() {
return FIELDS;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.providerIndexPattern = conf.get(
PROVIDER_INDEX_PATTERN_KEY);
}
}
|
And thats pretty much it! Delta indexing in just a few lines of code (relatively speaking of course - this is Java we are talking about :-)), thanks to Nutch/GORA (which gives us the Cassandra database) and Hector (which gives us the API to write to it from outside Nutch).
Update - 2012-02-04: I managed to figure out how to implement the CassandraStore.get() method, so my DeltaHandler code does not have any (direct) Hector calls anymore, here it is.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | // Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
public class DeltaHandler {
private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
private static final String PROVIDER_INDEX_URLPREFIX_KEY =
"mycompany.provider.index.urlprefix";
private static final String SOLR_URL = "mycompany.solr.url";
private static final Utf8 U_IDX = new Utf8("u_idx");
private String providerIndexUrlPrefix;
private CommonsHttpSolrServer solrServer;
private DataStore<String,WebPage> dataStore;
public DeltaHandler() {
try {
init();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void init() throws Exception {
Configuration conf = new Configuration();
conf.addResource("nutch-default.xml");
conf.addResource("nutch-site.xml");
this.providerIndexUrlPrefix =
conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
this.solrServer = new CommonsHttpSolrServer(
conf.get(SOLR_URL));
this.dataStore = StorageUtils.createDataStore(
conf, String.class, WebPage.class);
}
private void destroy() {
try {
dataStore.close();
} catch (Exception e) {
LOG.error(e);
}
}
/**
* Could be an addition or update. If the URL is present
* in the database, then it is considered to be an update
* and the fetch time is reset to current time - fetch
* interval. If the URL is not present in the database, it
* is considered to be an add operation, and the meta index
* page corresponding to the idx is reset, so it is recrawled.
* @param url the URL to publish.
* @param idx the value of the u_idx metadata.
* @throws Exception if thrown.
*/
public void publish(String url, String idx) throws Exception {
LOG.info("Starting publish for url=" + url + ", u_idx=" + idx);
String key = TableUtil.reverseUrl(url);
LOG.info("key=" + key);
WebPage page = dataStore.get(key);
if (page == null) {
// record does not exist, reset the index page
String indexKey = TableUtil.reverseUrl(
StringUtils.join(new String[] {
providerIndexUrlPrefix, idx}, "/"));
LOG.info("index key=" + indexKey);
WebPage indexPage = dataStore.get(indexKey);
LOG.info("is indexpage null?" + (indexPage == null));
resetFetchTime(indexPage);
dataStore.put(indexKey, indexPage);
LOG.info("Completed publish for url=" + url +
", u_idx=" + idx + ", reset fetch time for index page");
} else {
// record exists, reset its fetch time
resetFetchTime(page);
dataStore.put(key, page);
LOG.info("Completed publish for url=" + url +
", u_idx=" + idx + ", reset fetch time for page");
}
}
private void resetFetchTime(WebPage page) {
int fetchInterval = page.getFetchInterval();
LOG.info("after get fetch interval=" + fetchInterval);
page.setFetchTime(System.currentTimeMillis() - fetchInterval);
}
/**
* Checks to see if the record exists in the database with
* the specified u_idx metadata value. If so, deletes the
* record from SOLR, then marks the record as GONE in database.
* @param url the URL to unpublish.
* @param idx the value of the u_idx parameter.
* @throws Exception if thrown.
*/
public void unpublish(String url, String idx, boolean commit)
throws Exception {
LOG.info("Starting unpublish for url=" + url +
", u_idx=" + idx + ", commit=" + commit);
String key = TableUtil.reverseUrl(url);
WebPage page = dataStore.get(key);
if (page != null) {
if (page.getMetadata().containsKey(U_IDX)) {
String uIdx = Bytes.toString(Bytes.toBytes(
page.getMetadata().get(U_IDX)));
if (idx.equals(uIdx)) {
page.setStatus(CrawlStatus.STATUS_GONE);
dataStore.put(key, page);
deleteFromSolr(key, commit);
LOG.info("Completed unpublish for url=" + url +
", u_idx=" + idx);
}
}
}
}
private void deleteFromSolr(String key, boolean commit)
throws Exception {
solrServer.deleteById(key);
if (commit) {
solrServer.commit();
}
}
private static void usage() {
System.out.println(
"Usage: DeltaHandler publish|unpublish url idx [commit]");
System.out.println("commit = true|false, only for unpublish");
System.exit(-1);
}
public static void main(String[] args) {
String command = null;
String url = null;
String idx = null;
Boolean commit = null;
if (args.length > 0) {
command = args[0];
if (!("publish".equals(command)) &&
!("unpublish".equals(command))) {
usage();
}
}
if ("publish".equals(command)) {
if (args.length > 2) {
url = args[1];
idx = args[2];
} else {
usage();
}
} else if ("unpublish".equals(command)) {
if (args.length > 3) {
url = args[1];
idx = args[2];
commit = Boolean.valueOf(args[3]);
} else {
usage();
}
}
DeltaHandler handler = null;
try {
handler = new DeltaHandler();
if ("publish".equals(command)) {
handler.publish(url, idx);
} else {
handler.unpublish(url, idx, commit);
}
} catch (Exception e) {
LOG.error(e, e);
} finally {
if (handler != null) {
handler.destroy();
}
}
}
}
|
The patch for CassandraStore.get() is available at GORA-93. Since it is applied against GORA 0.2-incubating, you will also need to patch your NutchGora branch with the patch from NUTCH-1205. Note that the reason why neither patch is applied at the moment is because of some failing unit tests traceable to the gora-sql module. The patches worked fine for me, but I am using only gora-cassandra, so YMMV.