This post describes a possible implementation for an automated Newspaper Clipping Service. The end-user is a researcher (or team of researchers) in a particular discipline who registers an interest in a set of topics (or web-pages). An assistant (or team of assistants) then scour information sources to find more documents of interest to the researcher based on these topics identified. In this particular case, the information sources were limited to a set of "approved" newspapers, hence the name "Newspaper Clipping Service". The goal is to replace the assistants with an automated system.
The solution I came up with was to analyze the original web pages and treat keywords extracted out of these pages as topics, then for each keyword, query a popular search engine and gather the top 10 results from each query. The search engine can be customized so the sites it looks at is restricted by the list of approved newspapers. Finally the URLs of the results are aggregated together, and only URLs which were returned by more than 1 keyword topic are given back to the user.
The entire flow can be thought of as a series of Hadoop Map-Reduce jobs, to first download, extract and count keywords from (web pages corresponding to) URLs, and then to extract and count search result URLs from the keywords. I've been wanting to play with
Cascading for a while, and this seemed like a good candidate, so the solution is implemented with Cascading.
I have used
Scalding in the past, but it seems to me that while Scalding's collection-like interface is easier to work with, its harder to extend. So even though I think I could have done this in Scalding without any problems, the objective was to learn Cascading, so I used that instead. I initially started using Cascading with Scala (I write enough Java code at work :-)), but Cascading's use of generics (at least some of it) is
too complicated for Scala's type inference system, so I fell back to using Java instead
*.
One can write Cascading code in local mode, which uses in-memory data structures and the local filesystem, or in hadoop mode, which uses Hadoop and HDFS. Since this was a learning exercise, I decided to use local mode. To move it to Hadoop, one would have to use Hadoop specific FlowControllers and Taps instead. Here is the code for the Main (callable) class. The entire
Maven project is available on my GitHub page.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | // Source: src/main/java/com/mycompany/newsclip/Main.java
package com.mycompany.newsclip;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class Main {
@SuppressWarnings("rawtypes")
public static void main(String[] args) {
// handle input parameters
String input = args[0];
String output = args[1];
Fields urlFields = new Fields("num", "line");
Tap iTap = new FileTap(new TextLine(urlFields), input);
Fields kFields = new Fields("kword");
Tap oTap = new FileTap(new TextLine(kFields), output);
Pipe pipe = new Pipe("keyword");
// read urls, download, clean and extract keywords (1:n)
Function kFun = new KeywordExtractFunction(kFields);
pipe = new Each(pipe, urlFields, kFun);
// group by word and count it
pipe = new GroupBy(pipe, kFields);
Aggregator kCount = new Count(new Fields("count"));
pipe = new Every(pipe, kCount);
// filter out words with count < 1
Filter kCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
pipe = new Each(pipe, kCountFilter);
// pass the keywords to our custom google search
Fields kcFields = new Fields("kword", "count");
Fields uFields = new Fields("url");
Function uFun = new UrlExtractFunction(uFields);
pipe = new Each(pipe, kcFields, uFun);
// group by url and count it
pipe = new GroupBy(pipe, uFields);
Aggregator uCount = new Count(new Fields("count"));
pipe = new Every(pipe, uCount);
// filter out urls that occur once
Filter uCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
pipe = new Each(pipe, uCountFilter);
// remove the count value
pipe = new Each(pipe, Fields.ALL, new Identity(), Fields.FIRST);
FlowDef flowDef = FlowDef.flowDef().
setName("newsclip").
addSource(pipe, iTap).
addTailSink(pipe, oTap);
Properties props = new Properties();
AppProps.setApplicationJarClass(props, Main.class);
FlowConnector flowConnector = new LocalFlowConnector(props);
Flow flow = flowConnector.connect(flowDef);
flow.writeDOT("data/newsclip.dot");
flow.complete();
}
}
|
|
The corresponding Graphviz DOT file for the assembly (generated by flow.writeDOT in the code above) is shown at left. I converted it to a web-displayable PNG file using the command "dot -Tpng newsclip.dot -o newsclip.png".
The code above uses built-in functions and filters where available, but the core operations are done by custom functions. The KeywordExtractFunctionTest extracts a set of keywords from a web page given its URL. It uses Boilerpipe to extract the relevant plain text from a web page, and my implementation of the RAKE algorithm to extract keywords from the plain text.
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | // Source: src/main/java/com/mycompany/newsclip/KeywordExtractFunction.java
package com.mycompany.newsclip;
import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.DefaultExtractor;
@SuppressWarnings("rawtypes")
public class KeywordExtractFunction extends BaseOperation
implements Function {
private static final long serialVersionUID = -7122434545764806604L;
private static final Logger LOGGER =
LoggerFactory.getLogger(KeywordExtractFunction.class);
public KeywordExtractFunction(Fields fields) {
super(2, fields);
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall funCall) {
TupleEntry args = funCall.getArguments();
String url = args.getString(1);
String rawText = download(url);
String plainText = parse(rawText);
List<String> keywords = extractKeywords(plainText);
for (String keyword : keywords) {
Tuple t = new Tuple();
t.add(keyword);
funCall.getOutputCollector().add(t);
}
}
protected String download(String url) {
try {
URL u = new URL(url);
u.openConnection();
InputStream istream = u.openStream();
StringBuilder buf = new StringBuilder();
byte[] b = new byte[1024];
int bytesRead = 0;
while ((bytesRead = istream.read(b)) > 0) {
buf.append(new String(b, 0, bytesRead));
b = new byte[1024];
}
istream.close();
return buf.toString();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
protected String parse(String rawText) {
if (StringUtils.isEmpty(rawText)) return null;
else {
try {
return DefaultExtractor.INSTANCE.getText(rawText);
} catch (BoilerpipeProcessingException e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
}
protected List<String> extractKeywords(String plainText) {
try {
return RakeExtractor.INSTANCE.extract(plainText);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return Collections.emptyList();
}
}
}
|
The other custom function is the UrlExtractFunction, which takes each keyword and hands it off to
Google's Custom Search API, and returns the URLs of the top 10 search results returned. The Custom Search instance you set up can be customized to only allow results from a list of websites (or the entire web). The KEY and CX values are parameters that identify your client to the Google Search API, and you will need to populate a file with these values in src/main/resources/google.lic (the one in GitHub has placeholders).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 | // Source: src/main/java/com/mycompany/newsclip/UrlExtractFunction.java
package com.mycompany.newsclip;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
/**
* Function to take a keyword and use Google's custom search
* service to retrieve the top 10 URLs.
*/
@SuppressWarnings("rawtypes")
public class UrlExtractFunction extends BaseOperation implements Function {
private static final long serialVersionUID = 1622228905563317614L;
private static final Logger LOGGER =
LoggerFactory.getLogger(UrlExtractFunction.class);
private static final String CUSTOM_SEARCH_URL_TEMPLATE =
"https://www.googleapis.com/customsearch/v1?key={KEY}&cx={CX}&q={Q}";
private String key;
private String cx;
private ObjectMapper objectMapper;
public UrlExtractFunction(Fields fields) {
super(2, fields);
Properties props = new Properties();
try {
props.load(new FileInputStream("src/main/resources/google.lic"));
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
key = props.getProperty("key");
cx = props.getProperty("cx");
objectMapper = new ObjectMapper();
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall funCall) {
TupleEntry args = funCall.getArguments();
String keyword = args.getString(0);
List<String> urls = parseSearchResult(keyword);
for (String url : urls) {
Tuple t = new Tuple();
t.add(url);
funCall.getOutputCollector().add(t);
}
}
protected List<String> parseSearchResult(String keyword) {
try {
String url = CUSTOM_SEARCH_URL_TEMPLATE.
replaceAll("{KEY}", key).
replaceAll("{CX}", cx).
replaceAll("{Q}", URLEncoder.encode(keyword, "UTF-8"));
URL u = new URL(url);
u.openConnection();
InputStream istream = u.openStream();
StringBuilder buf = new StringBuilder();
byte[] b = new byte[1024];
int bytesRead = 0;
while ((bytesRead = istream.read(b)) > 0) {
buf.append(new String(b, 0, bytesRead));
b = new byte[1024];
}
istream.close();
return parseJson(buf.toString());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return Collections.emptyList();
}
}
protected List<String> parseJson(String json) throws Exception {
List<String> urls = new ArrayList<String>();
JsonParser parser = objectMapper.getJsonFactory().
createJsonParser(json);
JsonNode root = objectMapper.readTree(parser);
ArrayNode items = (ArrayNode) root.get("items");
for (JsonNode item : items) {
urls.add(item.get("link").getTextValue());
}
return urls;
}
}
|
And thats pretty much it. Put the list of your "interesting pages", one per line, into data/urls.txt, and run the Cascading job locally using the mvn exec:java command, as shown below. The output of the job is written to data/new_urls.txt. The new data can be used to feed back URLs into the original list (perhaps after some sort of manual vetting by the researcher).
| sujit@cyclone:cascading-newsclip$ mvn exec:java \
-Dexec.mainClass="com.mycompany.newsclip.Main" \
-Dexec.args="data/urls.txt data/new_urls.txt"
|
As you can see from the diagram, the Cascading code is running 11 Hadoop Map-Reduce jobs in sequence. This translates to a
lot of Hadoop code. So Cascading, like
Pig, is a huge time saver. Pig does allow Java UDFs, but I think Cascading's all-Java approach is easier to work with.
[*] Update 2013-04-16: I came across Tommy Chheng's post where he shows how to write Cascading code in Scala on this GitHub page. So great news, it appears that it may be possible to do this after all :-).