With libraries such as ROME, it is possible for an application to parse RSS and Atom feeds without having to worry about the actual feed format itself. The ROME Fetcher subproject provides a FeedFetcher class that can convert a remote URL into a ROME SyndFeed object. David Herron has built a (the only so far?) ROME based Feed Aggregator using Groovy, inspired by the PlanetPlanet feed reader.
My goals are much less ambitious, all I need is a way to aggregate news from various newspaper feeds, and make that aggregate searchable. My aggregator is written in Java using ROME and ROME Fetcher. It reads the list of sites from an OPML file (an idea I borrowed from David Herron's aggregator), downloads and parses each page, and writes the feed entry information (the link URL and its metadata such as title and description) into a flat file. This flat file is then injected into HBase using Nutch 2.0, and we go through a single cycle of generate, fetch and parse. At this point, we are ready to index - unfortunately, Nutch 2.0 only supports indexing to Solr, so I need a running Solr installation, which I don't have as yet.
The list of sites comes from the NewsRack project - they are doing interesting things with a combined concept/rule based approach to classification. They were kind enough to post a list of RSS feeds from various Indian newspapers that they use as part of their work. Here is what my sites.xml file looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | <?xml version="1.0"?>
<opml version="1.1">
<head>
<title>Feed Aggregator Test</title>
<ownerName>Sujit Pal</ownerName>
<description>India Newspapers Feed (Source:newsrack.in)</description>
</head>
</meta>
<body>
<outline text="Business Today"
siteurl="http://businesstoday.digitaltoday.in"
xmlUrl="http://businesstoday.digitaltoday.in/rssforallsec.php?issueid=14"/>
<outline text="Business World"
siteurl="http://www.businessworld.in"
xmlUrl="http://www.businessworld.in/component/option,com_rss/feed,RSS2.0/no_html,1/"/>
<!-- more feed sites ... -->
</body>
</opml>
|
The code for the aggregator, as expected, is quite simple. Parse out the configuration file above, and hit each of the URLs in it, then parse the returned page to retrieve the set of (link, title, description, pubDate) tuples from it, and write them out to a flat file in the structured format described below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | // Source: src/main/java/com/mycompany/myagg/fetcher/FeedAggregator.java
package com.mycompany.myagg.fetcher;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.httpclient.ConnectTimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.fetcher.FetcherException;
import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
import com.sun.syndication.fetcher.impl.HttpClientFeedFetcher;
import com.sun.syndication.io.FeedException;
/**
* Standalone Feed crawler that can be launched as a standalone command
* (usually from cron). Configured using an OPML file that lists the feed
* URLs to be crawled. Links on each feed along with their metadata are
* written to a structured flat file to inject into Nutch.
*/
public class FeedAggregator {
private final Log log = LogFactory.getLog(getClass());
private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
private static final int DEFAULT_READ_TIMEOUT = 5000;
private static final SimpleDateFormat ISO_DATE_FORMATTER =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
private String configFile;
private String seedFile;
private int connectTimeout;
private int readTimeout;
public void setConfigFile(String configFile) {
this.configFile = configFile;
}
public void setSeedFile(String seedFile) {
this.seedFile = seedFile;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
@SuppressWarnings("unchecked")
public void aggregate() throws Exception {
AggregatorConfig config = parseConfig();
HttpClientFeedFetcher feedFetcher = buildFeedFetcher(config);
PrintWriter seedWriter = new PrintWriter(new FileWriter(seedFile), true);
for (SiteInfo site : config.sites) {
try {
SyndFeed feed = feedFetcher.retrieveFeed(new URL(site.feedUrl));
if (feed == null) {
continue;
}
List<SyndEntry> entries = feed.getEntries();
for (SyndEntry entry : entries) {
String title = entry.getTitle();
String link = entry.getLink();
String description = entry.getDescription() != null ?
entry.getDescription().getValue() : "";
Date pubDate = entry.getPublishedDate();
store(seedWriter, title, link, description, pubDate);
}
} catch (FetcherException e) {
log.info("Fetcher Exception for " + site.publisher, e);
continue;
} catch (FeedException e) {
log.info("Feed Exception for " + site.publisher, e);
continue;
} catch (ConnectTimeoutException e) {
log.info("Connect Timeout Exception for " + site.publisher, e);
continue;
}
}
seedWriter.flush();
seedWriter.close();
}
private HttpClientFeedFetcher buildFeedFetcher(AggregatorConfig config) {
HttpClientFeedFetcher feedFetcher = new HttpClientFeedFetcher();
feedFetcher.setUserAgent(config.title + "(" + config.owner + ")");
feedFetcher.setConnectTimeout(connectTimeout == 0 ?
DEFAULT_CONNECT_TIMEOUT : connectTimeout);
feedFetcher.setReadTimeout(readTimeout == 0 ?
DEFAULT_READ_TIMEOUT : readTimeout);
feedFetcher.setFeedInfoCache(HashMapFeedInfoCache.getInstance());
return feedFetcher;
}
private void store(PrintWriter seedWriter, String title, String link,
String description, Date pubDate) throws Exception {
if (pubDate == null) {
pubDate = new Date();
}
seedWriter.println(StringUtils.join(new String[] {
link,
StringUtils.join(new String[] {"u_idx", "news"}, "="),
StringUtils.join(new String[] {
"u_title",
clean(title)
}, "="),
StringUtils.join(new String[] {
"u_summary",
clean(description)
}, "="),
StringUtils.join(new String[] {
"u_pubdate",
ISO_DATE_FORMATTER.format(pubDate)
}, "=")
}, "\t"));
}
private String clean(String str) {
if (StringUtils.isEmpty(str)) {
return "";
}
return str.replaceAll("\\s+", " ");
}
@SuppressWarnings("unchecked")
protected AggregatorConfig parseConfig() throws Exception {
AggregatorConfig config = new AggregatorConfig();
SAXBuilder builder = new SAXBuilder();
Document doc = builder.build(new File(configFile));
Element opml = doc.getRootElement();
config.title = opml.getChild("head").getChildTextTrim("title");
config.owner = opml.getChild("head").getChildTextTrim("ownerName");
config.description = opml.getChild("head").getChildTextTrim("description");
List<Element> outlines = opml.getChild("body").getChildren("outline");
for (Element outline : outlines) {
SiteInfo siteInfo = new SiteInfo();
siteInfo.publisher = outline.getAttributeValue("text");
siteInfo.siteUrl = outline.getAttributeValue("siteurl");
siteInfo.feedUrl = outline.getAttributeValue("xmlUrl");
config.sites.add(siteInfo);
}
return config;
}
protected class AggregatorConfig {
public String title;
public String owner;
public String description;
public List<SiteInfo> sites = new ArrayList<SiteInfo>();
}
protected class SiteInfo {
public String publisher;
public String siteUrl;
public String feedUrl;
}
public static void main(String[] args) {
if (args.length != 1) {
System.out.println(
"Usage: FeedAggregator config_file.xml connect_timeout " +
"read_timeout seed_file.txt");
System.exit(-1);
}
try {
FeedAggregator aggregator = new FeedAggregator();
aggregator.setConfigFile(args[0]);
aggregator.setConnectTimeout(Integer.valueOf(args[1]));
aggregator.setReadTimeout(Integer.valueOf(args[2]));
aggregator.setSeedFile(args[3]);
aggregator.aggregate();
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
Running this with the appropriate parameters produces a flat file, a line of which is shown below. This is a tab-separated list of name-value pairs which has been broken into multiple lines below for readability. The first part is the familiar seed URL. The rest of the fields are seed URL metadata name-value pairs. Here we choose to add the index name (assuming we will be storing multiple content types in our HBase table), the title, summary and publication date, which should be used to populate the corresponding index column instead of trying to parse these values from the HTML.
1 2 3 4 5 6 7 8 9 | http://www.indiatogether.org/2011/jan/env-brahm.htm\
u_idx=news\
u_title=Mine-ing the Brahmaputra's waters\
u_summary=India and China make competing plans for the river's \
precious waters, ignoring the functions it already performs - in sustaining \
rich ecosystems, flora and fauna, cultures and a wide range of livelihoods. \
Shripad Dharmadhikary reports.\
u_pubdate=2011-01-06T16:42:56.623
...
|
Note that this is new-ish functionality - the Nutch Injector supports adding custom metadata in this way since version 1.1, so this feature is usable with a file based crawling system also. In our HBase backed Nutch, the metadata live inside the "mtdt" column family after injection.
We then go through one cycle of generate, fetch and index, since we want to crawl only the pages referenced in the feed XMLs (ie depth 0). The end product of this exercise is, of course, an index with the metadata in it. Nutch 2.0 has made the decision to go with Solr as its search front end, so the only implementation of an indexer job is the SolrIndexerJob, which needs to be pointed at a running Solr installation. I hope to set up Solr and try this part out (and write about it) next week.
Dumb question... how do you avoid writing the same items each time you run the program?
ReplyDeleteSince its going to Nutch, we depend on Nutch's mechanism to update in place records that are already present in its db backend.
ReplyDelete