Showing posts with label unix. Show all posts
Showing posts with label unix. Show all posts

Thursday, December 23, 2010

Quick and Dirty Reporting with Awk

A few years ago, I wrote about starting to use awk after a long time. Over the last couple of years, I've used awk on and off, although not very frequently, and never beyond the basic pattern described in that post.

Last week, I described an AspectJ aspect that wrote out timing information into the server logs. I initially thought of using OpenOffice Calc, but then stumbled upon Jadu Saikia's post on implementing group-by with awk, and I realized that awk could do the job just as easily, and would be easier to use (for multiple invocations).

Here is the script I came up with.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env awk -f
# Builds a report out of aggregate elapsed time data similar to the Spring 
# StopWatch prettyPrint report.
# Usage:
# my_profile_report.awk [-v qt=query_term] filename
#
BEGIN {
  FS="|";
}
(qt == "" || qt == $3) {
  counts[$4]++;
  times[$4] += $5;
  if (mins[$4] + 0 != 0) {
    if (mins[$4] > $5) {
      mins[$4] = $5;
    }
  } else {
    mins[$4] = $5;
  }
  if (maxs[$4] + 0 != 0) {
    if (maxs[$4] < $5) {
      maxs[$4] = $5;
    }
  } else {
    maxs[$4] = $5;
  }
  totals += $5;
}
END {
  totavg = 0;
  for (t in times) {
    totavg += times[t]/counts[t];
  }
  printf("---------------------------------------------------------------------\n");
  printf("%-32s %8s %8s %8s %8s\n", "Controller", "Avg(ms)", "%", "Min(ms)", "Max(ms)");
  printf("---------------------------------------------------------------------\n");
  for (t in times) {
    avg = times[t]/counts[t];
    perc = avg * 100 / totavg;
    printf("%-32s %8.2f %8.2f %8.2f %8.2f\n", t, avg, perc, mins[t], maxs[t]);
  }
  printf("---------------------------------------------------------------------\n");
  printf("%-32s %8.2f\n", "Average Elapsed (ms):", totavg);
  printf("---------------------------------------------------------------------\n");
}

As shown in my previous post, the input file looks something like this (without the header, I just grep the server log file with the aspect name).

1
2
3
4
5
6
# aspect_name|request_uuid|query|tile_name|elapsed_time_millis
MyAspect2|ed9ce777-263e-4fe5-a8af-4ea84d78add3|some query|TileAController|132
MyAspect2|ed9ce777-263e-4fe5-a8af-4ea84d78add3|some query|TileBController|49
MyAspect2|ed9ce777-263e-4fe5-a8af-4ea84d78add3|some query|TileCController|2
MyAspect2|ed9ce777-263e-4fe5-a8af-4ea84d78add3|some query|TileDController|3
...

The script can be invoked with or without a qt parameter. Without parameters, the script computes the average, minimum and maximum elapsed times across all the queries that were given to the application during the profiling run. The report looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
sujit@cyclone:aspects$ ./my_profile_report.awk input_file
---------------------------------------------------------------------
Controller                        Avg(ms)        %  Min(ms)  Max(ms)
---------------------------------------------------------------------
TileFController                      1.42     0.02     0.00    42.00
TileAController                    187.67     2.83    27.00   685.00
TileJController                    169.97     2.56     7.00  3140.00
TileEController                      9.14     0.14     2.00    44.00
TileNController                     45.91     0.69     4.00   234.00
TileIController                    444.91     6.71     0.00  3427.00
TileDController                   1506.30    22.72   792.00 12140.00
TileMController                    184.88     2.79     0.00  3078.00
TileHController                     13.67     0.21     7.00    50.00
TileCController                     34.06     0.51    14.00   108.00
TileLController                    759.73    11.46     3.00  9921.00
TileGController                   2473.24    37.31   579.00 10119.00
TileBController                     24.48     0.37     7.00   132.00
TileKController                    773.97    11.67     0.00  8554.00
---------------------------------------------------------------------
Average Elapsed (ms):             6629.35
---------------------------------------------------------------------

If we want only the times for a specific query term, then we can specify it on the command line as shown below. If the query has been invoked multiple times, then the report will show the average of the calls for the query. In this case, there is only a single invocation for the query, so the minimum and maximum times are redundant.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
sujit@cyclone:aspects$ ./my_profile_report.awk -v qt=some_query input_file
---------------------------------------------------------------------
Controller                        Avg(ms)        %  Min(ms)  Max(ms)
---------------------------------------------------------------------
TileFController                      0.00     0.00     0.00     0.00
TileAController                    106.00     4.57   106.00   106.00
TileJController                      7.00     0.30     7.00     7.00
TileEController                     10.00     0.43    10.00    10.00
TileNController                      8.00     0.34     8.00     8.00
TileIController                      0.00     0.00     0.00     0.00
TileDController                   1309.00    56.42  1309.00  1309.00
TileMController                      0.00     0.00     0.00     0.00
TileHController                     12.00     0.52    12.00    12.00
TileCController                     25.00     1.08    25.00    25.00
TileLController                     64.00     2.76    64.00    64.00
TileGController                    767.00    33.06   767.00   767.00
TileBController                     12.00     0.52    12.00    12.00
TileKController                      0.00     0.00     0.00     0.00
---------------------------------------------------------------------
Average Elapsed (ms):             2320.00
---------------------------------------------------------------------

The script uses awk's associative arrays and the post processing block. Before this script, I had read about awk's BEGIN/END blocks but didn't know why I would want to use them, and I didn't know about awk's associative arrays at all. Thanks to Daniel Robbin's three part series - Common threads: Awk by example, Part 1, 2 and 3, I now have a greater understanding and appreciation of awk.

Saturday, November 13, 2010

Using SSH in Python with Paramiko

I recently had to modify a shell script that downloaded a bunch of files from a remote machine using scp, verify that the remote file was the "same as" the downloaded file (using a md5 checksum match on the remote and local files), and a few other things. I generally prefer using Python for my own scripting needs, but sadly Python hasn't caught on with our operations team, and since they were responsible for maintaining this particular script, it had to be a shell script. My shell scripting is a bit rusty, so the process was a bit rough ... and I kept asking myself how easy/hard it would be to do this with Python.

The shell script is run via a cron job, so it depends on passwordless ssh being set up between the two machines. During local testing, I did not have this set up, so another annoying thing was that I had to type in the password each time a ssh/scp call was made within the script. Of course, passwordless ssh is fairly easy to set up, but if you do this sort of thing infrequently (true in my case), its a bit of a pain to figure out each time.

I had never used SSH from within Python before - but I found this article an excellent tutorial on how to get started. You need to install paramiko (which needs PyCrypto). Once downloaded, installation is a matter of simply exploding the bundles and running "sudo python setup.py install" - the process was the same for both my Centos 5.x desktop and my Mac OSX laptop.

So in any case, to answer my own question, I ended up building a Python version of the same script. The resulting Python version is more verbose than the shell script, but it is more structured, has more optional features (so developers can specify command line options rather than have to hack the script to make it run in their local environment) and is much easier (for me anyway) to read.

There are some caveats though - first, paramiko supports the SFTP protocol but not the SCP protocol - so scp's have to be "faked" using the sftp client's get and put calls - not really a big deal, just something to be aware of, since sftp is a subsystem of ssh, and if you have sshd running on your remote machine, you automatically get sftp. The second caveat is that the sftp client does not support Unix wildcards, so you have to write application code to get each filename individually - this makes your application code more verbose.

Since the script is somewhat application specific, I decided to take the relevant parts and build them into a Python version of scp that optionally allows a password to be specified on the command line, and does an md5 checksum comparison between each remote and local file to verify the copy.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
sujit@cyclone:unix$ ./ssh_copy.py --help
Usage: ssh_copy.py source_path target_path

Options:
  -h, --help            show this help message and exit
  -p PASSWORD, --password=PASSWORD
                        SSH password
  -v, --verify          Verify copy

Remote path should be specified as user@host:/full/path

The code for the script is shown below. The switch on the shebang line is to suppress deprecation warnings from PyCrypto saying that the random number generator in the current release was broken. The rest of the code is fairly self-explanatory, the paramiko SSH client setup/teardown are in the open_sshclient and close_sshclient methods, and the meat of the code is in copy_files.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#! /usr/bin/python -W ignore::DeprecationWarning
# Copy a directory of files from a remote to local machine

import commands
import os
import os.path
import paramiko
import re
import sys
from optparse import OptionParser

def is_remote_path(path):
  return path.find("@") > -1 and path.find(":") > -1

def parse_remote_path(path):
  return re.split("[@:]", path)

def validate_command(argv):
  usage = "Usage: %prog source_path target_path"
  epilog = "Remote path should be specified as user@host:/full/path"
  parser = OptionParser(usage=usage, epilog=epilog)
  parser.add_option("-p", "--password", dest="password",
    default="", help="SSH password")
  parser.add_option("-v", "--verify", action="store_true",
    dest="verify", default=False, help="Verify copy")
  parser.set_defaults()
  (opts, args) = parser.parse_args(argv)
  try:
    if opts.help:
      parser.print_help()
      sys.exit(0)
  except AttributeError:
    pass
  if len(args) != 3:
    print "Error: Too many or too few arguments supplied"
    parser.print_help()
    sys.exit(-1)
  if (is_remote_path(args[1]) and is_remote_path(args[2])) or \
    (not is_remote_path(args[1]) and not is_remote_path(args[2])):
    print "Error: One path should be remote and one local"
    parser.print_help()
    sys.exit(-1)
  cmd_args = {}
  cmd_args["password"] = opts.password
  cmd_args["verify"] = opts.verify
  if is_remote_path(args[1]):
    (user, host, source) = parse_remote_path(args[1])
    target = args[2]
    mode = "download"
  else:
    source = args[1]
    (user, host, target) = parse_remote_path(args[2])
    mode = "upload"
  cmd_args["user"] = user
  cmd_args["host"] = host
  cmd_args["source"] = source
  cmd_args["target"] = target
  cmd_args["mode"] = mode
  return cmd_args

def is_download_mode(cmd_args):
  return cmd_args["mode"] == "download"

def open_sshclient(cmd_args):
  ssh_client = paramiko.SSHClient()
  ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  ssh_client.load_system_host_keys()
  if len(cmd_args["password"]) == 0:
    ssh_client.connect(cmd_args["host"])
  else:
    ssh_client.connect(cmd_args["host"], \
      username=cmd_args["user"], password=cmd_args["password"])
  return ssh_client

def find_remote_files(cmd_args, type, ssh):
  (ssh_in, ssh_out, ssh_err) = ssh.exec_command(
    "find %s -name \"*\" -type %s" % (cmd_args["source"], type))
  files = []
  for file in ssh_out.readlines():
    files.append(file.rstrip())
  return files

def remote_mkdir(dir, ssh):
  ssh.exec_command("mkdir %s" % dir)

def find_local_files(cmd_args, type):
  local_out = commands.getoutput(
    "find %s -name \"*\" -type %s" % (cmd_args["source"], type))
  files = []
  for file in local_out.split("\n"):
    files.append(file)
  return files

def get_remote_md5(file, ssh):
  # md5sum was not being found via SSH, so had to add full path
  (ssh_in, ssh_out, ssh_err) = ssh.exec_command(
    "/usr/local/bin/md5sum %s" % file)
  for line in ssh_out.readlines():
    md5sum = line.split(" ")[0]
    return md5sum

def get_local_md5(file):
  local_out = commands.getoutput("md5sum %s" % file)
  return local_out.split(" ")[0]

def verify_files(source_file, target_file, cmd_args, ssh):
  if is_download_mode(cmd_args):
    local_md5 = get_local_md5(target_file)
    remote_md5 = get_remote_md5(source_file, ssh)
  else:
    local_md5 = get_local_md5(source_file)
    remote_md5 = get_remote_md5(target_file, ssh)
  if local_md5 == remote_md5:
    if is_download_mode(cmd_args):
      print "Download %s (%s)" % (target_file, "OK")
    else:
      print "Upload %s (%s)" % (source_file, "OK")
  else:
    if is_download_mode(cmd_args):
      print "Download %s (%s)" % (target_file, "Failed")
    else:
      print "Upload %s (%s)" % (source_file, "Failed")

def copy_files(cmd_args, ssh):
  if is_download_mode(cmd_args):
    source_dirs = find_remote_files(cmd_args, "d", ssh)
    source_files = find_remote_files(cmd_args, "f", ssh)
  else:
    source_dirs = find_local_files(cmd_args, "d")
    source_files = find_local_files(cmd_args, "f")
  for source_dir in source_dirs:
    rel_path = re.sub(cmd_args["source"], "", source_dir)
    if is_download_mode(cmd_args):
      os.mkdir("".join([cmd_args["target"], rel_path]))
    else:
      remote_mkdir("".join([cmd_args["target"], rel_path]), ssh)
  ftp = ssh.open_sftp()
  for source_file in source_files:
    rel_path = re.sub(cmd_args["source"], "", source_file)
    target_file = "".join([cmd_args["target"], rel_path])
    if is_download_mode(cmd_args):
      ftp.get(source_file, target_file)
    else:
      ftp.put(source_file, target_file)
    if cmd_args["verify"]:
      verify_files(source_file, target_file, cmd_args, ssh)
  ftp.close()
  
def close_sshclient(ssh):
  ssh.close()
  
def main():
  cmd_args = validate_command(sys.argv)
  ssh = open_sshclient(cmd_args)
  copy_files(cmd_args, ssh)
  close_sshclient(ssh)

if __name__ == "__main__":
  main()

Here are two examples of using this script, the first for downloading files from a remote server, and the other to upload some files to the remote server. In both cases, we supply the password, and ask that the file transfer be verified by comparing the source and target file MD5 Checksums.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
sujit@cyclone:unix$ ./ssh_copy.py \
  sujit@avalanche:/Users/sujit/Projects/python-scripts \
  /tmp/python-scripts -p secret -v
...
Download /tmp/python-scripts/src/unix/rcstool.py (OK)
Download /tmp/python-scripts/src/unix/ssh_copy.py (OK)
sujit@cyclone:unix$
sujit@cyclone:unix$ ./ssh_copy.py \
  /Users/sujit/Projects/python-scripts \
  sujit@avalanche:/tmp/python-scripts -p secret -v
...
Upload /Users/sujit/Projects/python-scripts/src/unix/rcstool.py (OK)
Upload /Users/sujit/Projects/python-scripts/src/unix/ssh_copy.py (OK)
sujit@cyclone:unix$

Obviously, for one-off command line usage, this is not a huge deal... you could simply use the built in scp command to do the same thing (with much less effort). The fun starts when you want to embed one or more scp commands inside of your script, then the convenience of being able to keep a single ssh connection open and run multiple commands over it, not having to deal with backtick hell, and having a full-blown language to parse returned values really starts to makes a difference.

Friday, December 04, 2009

A Unison replacement with rsync

Before Unison, I used a simple rsync script to synchronize code between my laptop and desktop. If you are interested, it is described here. The script was a simple Python wrapper over the Unix rsync command, just so I didn't have to remember all the switches.

However, the script was overly simplistic, and required some discipline to ensure that files did not get clobbered during syncing. For one, you had to start with a known "clean" state, so anytime you wanted to make a change on your laptop, you would have to download the latest code from the desktop first. Once your changes were done, you would have to remember to upload your changes in.

Having used Unison for a while now, I have gotten used to it telling me that I am about to shoot myself in the foot, rather than having to figure it out for myself. So it was something of a setback when I could not get Unison to work on my Macbook Pro (syncing against a CentOS 5.3 based desktop), but I could not go back to using the old script anymore. I decided to add some smarts to the old program so it behaved similar to Unison.

Challenges

Unison does a bidirectional sync each time it is called. One can simulate this (sort of) using a pair of rsync calls (an upsync and a downsync) using the --update switch so newer files from each side are propagated across to the other.

Relying on the file timestamps has a few problems, though. First we assume that the clocks on both machines are close enough, an assumption which is probably mostly true since most modern machines run ntpd.

Second (and perhaps more importantly), there is a chance of one of your local changes being clobbered if there is a newer version of the same file on the remote machine. This can happen in my case as the files on my remote machine (my desktop) is under CVS control, so if someone just checked in a change to the file I synced earlier and changed, a "cvs update" on the remote machine before doing the next sync will overwrite the changes on the version of the file on my laptop.

There is also the reverse case where your local changes can propagate over a remote change that was previously committed, but doing a "cvs update" before a "cvs commit" should detect that, so I am not worried so much about handling that case.

Script

To handle the local file clobbering problem, in addition to simulating the bidirectional sync with a pair of rsync calls, I also build a snapshot of the files after each sync - the snapshot is really a pickled dictionary (serialized Map for you Java guys) of the MD5 checksums for each files after the sync. On the next sync call, I use the snapshot to find which files have changed locally. Then I do a downsync in --dry-run mode and remove from the downsync file list the files that have changed locally. This prevents files that have changed locally from being overwritten by any remote changes. I then do an upsync in --dry-run mode, and remove from the changed list those files that appear in the upsync list. The remaining files are essentially "conflicts" which the program does not know what to do with, and should defer to my decision (whether to upsync, downsync or ignore).

The user-interface (i.e., the configuration files and console output) are influenced heavily by Unison's, since I wanted to reuse my profiles as much as possible. The configuration files are stored in a .sync_conf directory under the home directory, as named files with key-value pair properties.

A sample script is shown below. It identifies the local and remote root directories for this profile, and specifies the file patterns that should be excluded from the sync.

1
2
3
local=/Users/sujit/test
remote=spal@localhost:/home/spal/test
excludes=.*,target/*,bin/*

If you look at the ~/.sync_conf directory, you will also find a .dat file for each profile after the first sync is done - this is the snapshot. If you delete the snapshot, then you should make sure that you don't have any outstanding local changes (make copies) and rerun the sync.

As you can figure out from the spal@localhost prefix on the remote key value, I use a local tunnel on my laptop to connect to my desktop over ssh. Since I have to do multiple rsync calls per sync, I needed to set up passwordless ssh to avoid having to type the password in multiple times.

Here is the code - like its previous incarnation, it is written in Python. The script is heavily documented, and I have already briefly described the algorithm above, so it should not be too hard to understand.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/python
# Does a somewhat brittle bi-directional sync. By brittle, I mean that
# this is tailored to my particular use-case rather than a general one.
# My use case is a laptop being sync'd to a desktop at work. The code
# on the desktop is under CVS control (so I can potentially recover
# clobbered files). The script tries to minimize the chance of clobbering
# files on the laptop.
#
import os
import sys
import getopt
import os.path
import cPickle
import hashlib

# CONFIG
CONF_DIR = "/Users/sujit/.sync_conf"
RSYNC_RSH = "ssh -p 9922"
# CONFIG

def usage():
  """
  Print usage information to the console and exits.
  """
  print "Usage: sync.py profile"
  print "       sync.py --list|--help"
  print "--list|-l: list available profiles"
  print "--help|-h: print this message"
  print "profile: name of profile.prf file to use"
  sys.exit(-1)

def list_profiles():
  """ 
  Print list of available profiles to the console and exits to
  the OS. Profiles are stored as .prf files in CONF_DIR.
  """
  print "Available Profiles:"
  for file in os.listdir(CONF_DIR):
    if (file.endswith(".prf")):
      print " ", file
  sys.exit(-1)

def abs_path(dirname, filename):
  """
  Convenience method to construct an absolute path for the given
  directory and file. Similar to the Java File constructor, except
  that this will not resolve properly in Windows systems (I think).
  @param dirname - the name of the directory
  @param filename - the name of the file.
  @return the absolute pathname for the file.
  """
  return os.sep.join([dirname, filename])

def get_configuration(profile):
  """
  Read the configuration off the .prf file into a dictionary
  for programmatic access.
  @param profile - the name of the sync profile.
  @return the dictionary containing configuration key-value pairs.
  """
  conf = {}
  profile_file = open(abs_path(CONF_DIR, profile + ".prf"), 'rb')
  for line in profile_file.readlines():
    (key, value) = line[:-1].split("=")
    conf[key] = value
  profile_file.close()
  return conf

def compute_file_md5_hash(file):
  """
  Computes the MD5 Hash of the named file. To avoid out of memory
  for large files, we read in the file in chunks of 1024 bytes each
  (any multiple of 128 bytes should work fine, since that is MD5's
  internal chunk size) and build up the md5 object.
  @param file - the name of the file to compute MD5 hash of.
  @return the MD5 digest of the file.
  """
  md5 = hashlib.md5()
  f = open(file, 'rb')
  while (True):
    chunk = f.read(1024)
    if (chunk == ""):
      break
    md5.update(chunk)
  f.close()
  return md5.digest()

def compute_md5_hashes(snapshot, dirname, fnames):
  """
  Visit named file and compute its MD5 hash, and store it into the
  snapshot dictionary.
  @param snapshot - a reference to the dictionary.
  @param dirname - the name of the current directory.
  @param fnames - the names of the files in the directory.
  """
  for fname in fnames:
    absname = abs_path(dirname, fname)
    if os.path.isfile(absname):
      snapshot[absname] = compute_file_md5_hash(absname)

def save_snapshot(profile, src):
  """
  Recursively traverse the directory tree rooted in src and compute
  the MD5 hash for each file. Write out the dictionary in pickled
  form to the snapshot (.dat) file.
  @param profile - the name of the sync profile.
  @param src - the local directory root.
  """
  snapshot = {}
  os.path.walk(src, compute_md5_hashes, snapshot)
  snapshot_file = open(abs_path(CONF_DIR, profile + ".dat"), 'wb')
  cPickle.dump(snapshot, snapshot_file, protocol=1)
  snapshot_file.close()

def load_snapshot(profile):
  """
  Loads the snapshot dictionary containing full path names of source
  files with their MD5 hash values from the pickled file.
  @param profile - the name of the sync profile.
  @return the dictionary of path name to MD5 hashes.
  """
  snapshot_file = open(abs_path(CONF_DIR, profile + ".dat"), 'rb')
  snapshot = cPickle.load(snapshot_file)
  snapshot_file.close()
  return snapshot

def check_if_changed(args, dirname, fnames):
  """
  Visits each file and computes the MD5 checksum, then compares it
  with the checksum available in the snapshot. If no checksum exists
  in the snapshot, it is considered to be a new file (ie, created
  since the last sync was done).
  @param args - a tuple containing the snapshot dictionary and the
             set of changed files so far.
  @param dirname - the name of the current directory.
  @param fnames - the names of the files in the directory.
  """
  (snapshot, changed_files) = args
  for fname in fnames:
    absname = abs_path(dirname, fname)
    try:
      orig_md5 = snapshot[absname]
      new_md5 = compute_file_md5_hash(absname)
      if (orig_md5 != new_md5):
        changed_files.add(absname)
    except KeyError:
      continue
    except TypeError:
      continue

def get_changed_since_sync(profile, src):
  """
  Computes a set of local file names which changed since the last time
  the sync was run. This is to prevent clobbering of local files by
  remote files containing a newer timestamp. The method walks the
  directory tree rooted in src and computes the checksum of each file
  in it, comparing it to the checksum from the snapshot. If the checksum
  differs, then it is written to the changed_files set.
  @param profile - the name of the sync profile.
  @param src - the local directory root.
  @return - a (possibly empty) set of changed file names, relative to
            the src directory.
  """
  snapshot = load_snapshot(profile)
  changed_files = set()
  os.path.walk(src, check_if_changed, (snapshot, changed_files))
  return map(lambda x: x.replace(src + os.sep, ""), changed_files)

def run_rsync_command(profile, src, dest, conf, force, files=[]):
  """ 
  Generate the rsync command for the OS to run based on input parameters.
  The output of the OS command is filtered to extract the files that
  are affected and a list of file names is returned.
  @param profile - the name of the sync profile.
  @param src - the local root.
  @param dest - the remote root.
  @param conf - a reference to the configuration dictionary.
  @param force - if set to false, rsync will be run in --dry-run mode,
                 ie, no files will be transferred.
  @param files - if provided, only the files in the list will be synced.
  @return a list of files affected.
  """
  # set up the basic command (we just add things to it for different
  # cases)
  command = " ".join(["rsync",
      "" if force else "--dry-run",
      "--cvs-exclude",
      " ".join(map(lambda x: "--exclude=" + x, conf["excludes"].split(","))),
      "--delete",
      "--update",
      "--compress",
      "-rave",
      "'" + RSYNC_RSH + "'"
  ])
  from_file_name = ""
  if (len(files) > 0):
    # create a text file and use --files-from parameter to only
    # sync files in the files-from file
    from_file_name = abs_path(CONF_DIR, profile + ".list")
    filelist = open(from_file_name, 'wb')
    for file in files:
      filelist.write(file.replace(conf["local"] + os.sep, "") + os.linesep)
    filelist.flush()
    filelist.close()
    command = " ".join([command,
      "--files-from=" + from_file_name, src + "/", dest])
  else:
    command = " ".join([command, src + "/", dest])
  # run the command
  result = []
  for line in os.popen(command):
    if (len(line.strip()) == 0 or
        line.find("file list") > -1 or
        line.find("total size") > -1 or
        (line.find("sent") > -1 and line.find("received") > -1)):
       continue
    result.append(line[:-1])
  if (len(from_file_name) > 0 and os.path.exists(from_file_name)):
    os.remove(from_file_name)
  return result

def bidirectional_sync(profile, src, dest, conf):
  """
  The algorithm consists of multiple rsync commands. Inline comments
  describe this in more detail. These checks are meant to prevent
  clobbering of local changes. The set of files that do not have a
  conflict (in either direction) are presented to the user for
  approval and two rsyncs are done. Then the conflicts are presented
  one by one. In most cases, the user should choose [u]psync.
  @param profile - the name of the sync profile. At the end of the
  sync operation, a snapshot of the current sync is stored.
  @param src - the local root.
  @param dest - the remote root.
  @param conf - the sync configuration.
  """
  # first find the local changes since the last sync. If
  # there is no .dat file, then ignore this step
  changed_since_sync = set()
  if (os.path.exists(abs_path(CONF_DIR, profile + ".dat"))):
    changed_since_sync = get_changed_since_sync(profile, src)
  # then do a dry-run of a downsync to get remote files to sync
  remote_changes = run_rsync_command(profile, dest, src, conf, False)
  # downsync only the files which are NOT in the changed_since_sync list.
  # To do this, we partition the remote_changes list into two sets
  non_conflicts, conflicts = [], []
  for remote_change in remote_changes:
    if (remote_change in changed_since_sync):
      conflicts.append(remote_change)
    else:
      non_conflicts.append(remote_change)
  remote_changes = []
  remote_changes.extend(non_conflicts)
  # do a dry-run of the upsync to get local files to upload
  local_changes = run_rsync_command(profile, src, dest, conf, False)
  # remove from conflicts that appear in changed_since_sync
  for local_change in local_changes:
    if (local_change in conflicts):
      conflicts.remove(local_change)
  # merge remote_ok and changed_since_sync, with the appropriate signage
  for remote_change in remote_changes:
    print "L<--R", remote_change
  for local_change in local_changes:
    print "L-->R", local_change
  if (len(remote_changes) + len(local_changes) > 0):
    yorn = raw_input("Is this OK [y/n/q]? ")
    if (yorn == 'y' or yorn == 'Y'):
      # do the rsync
      run_rsync_command(profile, src, dest, conf, True, local_changes)
      run_rsync_command(profile, dest, src, conf, True, remote_changes)
    elif (yorn == "q" or yorn == "Q" or yorn == "n" or yorn == "N"):
      return
  # lastly, take care of the conflicts on a per-file basis
  for conflict in conflicts:
    conflict_list = []
    action = raw_input("L<X>R " + conflict + " [u/d/n/q]? ")
    if (action == "u" or action == "U"):
      conflict_list.append(conflict)
      run_rsync_command(profile, src, dest, conf, True, conflict_list)
    elif (action == 'd' or action == 'D'):
      conflict_list.append(conflict)
      run_rsync_command(profile, dest, src, conf, True, conflict_list)
    elif (action == 'n' or action == 'N'):
      continue
    else:
      continue
  save_snapshot(profile, src)

def main():
  """
  This is how we are called. See usage() or call the script with the
  --help option for more information.
  """
  if (len(sys.argv) == 1):
    usage()
  (opts, args) = getopt.getopt(sys.argv[1:], "lh", ["list", "help"])
  for option, argval in opts:
    if (option in ("-h", "--help")):
      usage()
    elif (option in ("-l", "--list")):
      list_profiles()
  profile = sys.argv[1]
  # read the profile file
  conf = get_configuration(profile)
  # do the bidirectional sync
  bidirectional_sync(profile, conf["local"], conf["remote"], conf)

if (__name__ == "__main__"):
  main()

Usage

To get the list of profiles already available, type sync.py --list. To add or edit a profile, you have to go to the ~/.sync_conf directory and create or edit the profiles .prf file. This is actually simpler (copy an existing .prf and modify it) than doing it via a GUI.

A sample run is shown below. As you can see, it correctly detects changes on both systems. I have also tested the situation where a remote change is newer than a corresponding local change, and it successfully detects the conflict and allows me to upsync or downsync as I see fit.

1
2
3
4
5
6
7
sujit@cyclone:~$ sync.py test
L<--R ./
L<--R tunnel-indexer.prf
L-->R ./
L-->R tunnel-util.prf
Is this OK [y/n/q]? y
sujit@cyclone:~$

The script is obviously not a Unison replacement, but it works for me. I probably would start using Unison again if it became available, but until it is, this script should suffice.

Friday, July 10, 2009

Running a Hadoop Job on Amazon EC2

Sometime early last year, a colleague went to the WWW2008 conference at Beijing. One of the ideas he brought back was that of identifying common phrases in use in your vertical by extracting them from the documents in your corpus - the paper it came out of was not even one of the major ones, but it stuck to me, because of its simplicity.

I didn't know anything about Hadoop at the time, so while I had an implementation figured out shortly after the talk, I did not write any code, since I did not have a way to run it on a sufficiently large volume of text. Lately, however, I've been looking at Hadoop again, with a view to running jobs on Amazon's Elastic Compute (EC2) service, so I figured that it may be a good thing to try out.

The way I planned to do it was to generate 2 to 5 word grams from the document, then aggregating them. As an example, the text:

1
First, she tried to look down and make out what she was coming to,...
is decomposed to the following subsequences, then passed into a Hadoop MapReduce job to find how many times each phrase occurred. Downstream code will presumably treat the highest occurring phrases as "common" somehow.

1
2
3
4
5
6
7
8
(first she)
(first she tried)
(first she tried to)
(first she tried to look)
(she tried)
(she tried to)
(she tried to look)
... etc.

The books I used as my "corpus" for this test are Alice in Wonderland, Moby Dick and The Adventures of Sherlock Holmes, all from Project Gutenberg's collection of e-books.

Amazon EC2 Setup

Setting up to work with Amazon's EC2 service is easy if you know how. There are lots of Internet resources, including Amazon's own EC2 documentation pages, that provide information about this. Chuck Lam's Hadoop in Action (Early Access) book has an entire chapter devoted to this, and I basically followed it step by step, and was successful. In a nutshell, here is what I needed to do.

  1. From Amazon's site, create and download the private key file (pk-*) and certificate (cert-*) and copy it to your ~/.ec2 directory.
  2. Download and install Hadoop (if not installed already).
  3. Download and install Amazon's EC2 API Tools.
  4. From Amazon's site, get your account number, the AWS Access key, and the AWS Secret Access Key, and put it in the appropriate places in your $HADOOP_HOME/src/contrib/ec2/hadoop-ec2-env.sh file.
  5. Figure out what instance type you want (I chose m1.medium), and update the hadoop-ec2-env.sh file.
  6. Add this information into your .bash_profile. The snippet from my .bash_profile is shown below. This puts the ec2 api tools and the hadoop-ec2 tools in your PATH, and also provides the tools with information about your private key and certificate.
  7. Source your .bash_profile.
  8. Generate your keypair (ec2-add-keypair gsg-keypair) and store the private part of the generated RSA key to ~/.ec2/id_rsa-gsg-keypair with permissions 600. The tool will put the public part of this keypair in Amazon's repository so you can have passphraseless ssh connectivity.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Snippet from my .bash_profile file (the EC2_PRIVATE_KEY and
# EC2_CERT values are set to dummy values.
...
# Hadoop
export HADOOP_HOME=/opt/hadoop-0.18.1

# EC2 Access
export EC2_HOME=/opt/ec2-api-tools-1.3-36506
export PATH=$PATH:$EC2_HOME/bin:$HADOOP_HOME/src/contrib/ec2/bin
export EC2_PRIVATE_KEY=$HOME/.ec2/pk-ABCD1234EFGH5678.pem
export EC2_CERT=$HOME/.ec2/cert-ABCD1234EFGH5678.pem
...

The code

The book text is first broken up into sentences, and then put together in one large file, one sentence per line. It is run offline, as a sort of data preparation step. Here is the code - there is no Hadoop code here, all it does is to read each of the files downloaded off the Gutenberg site, tokenize the content into sentences, and write them out to the output file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/OfflineSentenceGenerator.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.text.BreakIterator;

import org.apache.commons.io.FileUtils;

/**
 * Preprocesses the Gutenberg books into sentences, one sentence
 * per line.
 */
public class OfflineSentenceWriter {

  private String inputDirectoryPath;
  private String outputFilePath;
  
  public void setInputDirectory(String inputDirectoryPath) {
    this.inputDirectoryPath = inputDirectoryPath;
  }
  
  public void setOutputFile(String outputFilePath) {
    this.outputFilePath = outputFilePath;
  }
  
  public void convertToSentencePerLineFormat() throws Exception {
    File[] inputs = new File(inputDirectoryPath).listFiles();
    PrintWriter output = new PrintWriter(
      new FileWriter(outputFilePath), true);
    for (File input : inputs) {
      BreakIterator sentenceIterator = BreakIterator.getSentenceInstance();
      String text = FileUtils.readFileToString(input, "UTF-8");
      text = text.replaceAll("\n", " ");
      sentenceIterator.setText(text);
      int current = 0;
      for (;;) {
        int end = sentenceIterator.next();
        if (end == BreakIterator.DONE) {
          break;
        }
        String sentence = text.substring(current, end);
        output.println(sentence);
        current = end;
      }
    }
    output.flush();
    output.close();
  }
}

The code to convert a sentence into a series of word grams is done using the WordNGramGenerator.java shown below. It takes a input string (a sentence in our case), and the minimum and maximum size of the word grams to be produced. I find it helpful to pull out the complex parts into their own classes and just use it inside the MapReduce job, rather than building it into the MapReduce code directly, because that way its easier to test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/WordNGramGenerator.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.text.BreakIterator;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Given a sentence, generates the specified word N-grams from it and
 * returns it as a List of String.
 */
public class WordNGramGenerator {

  private final Log log = LogFactory.getLog(getClass());
  
  public List<String> generate(String input, int minGram, int maxGram) 
      throws IOException {
    List<String> wordgrams = new ArrayList<String>();
    List<String> tokens = new LinkedList<String>();
    BreakIterator wordBreakIterator = 
      BreakIterator.getWordInstance(Locale.getDefault());
    wordBreakIterator.setText(input);
    int current = 0;
    int gindex = 0;
    for (;;) {
      int end = wordBreakIterator.next();
      if (end == BreakIterator.DONE) {
        // take care of the remaining word grams
        while (tokens.size() >= minGram) {
          wordgrams.add(StringUtils.join(tokens.iterator(), " "));
          tokens.remove(0);
        }
        break;
      }
      String nextWord = input.substring(current, end);
      current = end;
      if ((StringUtils.isBlank(nextWord)) ||
          (nextWord.length() == 1 && nextWord.matches("\\p{Punct}"))) {
        continue;
      }
      gindex++;
      tokens.add(StringUtils.lowerCase(nextWord));
      if (gindex == maxGram) {
        for (int i = minGram; i <= maxGram; i++) {
          wordgrams.add(StringUtils.join(
            tokens.subList(0, i).iterator(), " "));
        }
        gindex--;
        tokens.remove(0);
      }
    }
    return wordgrams;
  }
}

And finally, the MapReduce job to do the phrase extraction and aggregation. The Map class reads a sentence at a time, then calls the WordNGramGenerator to produce the word n-grams, and writes them out. On the Reduce side, Hadoop already has a convenience Reducer (the LongSumReducer) for what I am doing, so I use that.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Source: src/main/java/net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseExtractor.java
package net.sf.jtmt.concurrent.hadoop.phraseextractor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;

/**
 * Breaks up input text into sentences, then generates 2-5 grams of
 * the input text.
 */
public class PhraseExtractor {

  private static class MapClass extends MapReduceBase 
      implements Mapper<WritableComparable<Text>,Writable,
                 WritableComparable<Text>,Writable> {

    private static final LongWritable ONE = new LongWritable(1);
    
    public void map(WritableComparable<Text> key, Writable value,
        OutputCollector<WritableComparable<Text>,Writable> output,
        Reporter reporter) throws IOException {
      String sentence = ((Text) value).toString();
      WordNGramGenerator ngramGenerator = new WordNGramGenerator();
      List<String> grams = ngramGenerator.generate(sentence, 2, 5);
      for (String gram : grams) {
        output.collect(new Text(gram), ONE);
      }
    }
  }

  public static void main(String[] argv) throws IOException {
    if (argv.length != 2) {
      System.err.println("Usage: calc input_path output_path");
    }
    JobConf conf = new JobConf(PhraseExtractor.class);
    
    FileInputFormat.addInputPath(conf, new Path(argv[0]));
    FileOutputFormat.setOutputPath(conf, new Path(argv[1]));
    
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LongWritable.class);
    
    conf.setMapperClass(MapClass.class);
    conf.setCombinerClass(LongSumReducer.class);
    conf.setReducerClass(LongSumReducer.class);
    conf.setNumReduceTasks(2);
    
    JobClient.runJob(conf);
  }
}

The above code needs to be packaged appropriately into a JAR file. Here is the snippet of Ant code that does this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  <target name="build-hadoop-phrase-extractor" 
      depends="_init" description="Build Phrase Extractor job on Hadoop">
    <!-- create new directory target/lib and copy required runtime
         dependencies for the hadoop job into it -->
    <delete dir="${maven.build.directory}/jars"/>
    <mkdir dir="${maven.build.directory}/jars/lib"/>
    <copy todir="${maven.build.directory}/jars/lib" flatten="true">
      <fileset dir="${maven.repo.local}">
        <include name="commons-lang/commons-lang/2.1/commons-lang-2.1.jar"/>
        <include name="commons-io/commons-io/1.2/commons-io-1.2.jar"/>
      </fileset>
    </copy>
    <!-- create jar file for phrase-extractor -->
    <jar jarfile="${maven.build.directory}/phrase-extractor.jar">
      <fileset dir="${maven.build.directory}/classes"/>
      <fileset dir="${maven.build.directory}/jars"/>
      <manifest>
        <attribute name="Main-Class"
          value="net/sf/jtmt/concurrent/hadoop/phraseextractor/PhraseExtractor"/>
      </manifest>
    </jar>
  </target>

Running the Code in EC2

Caution: You are now entering the paid area!. If you are playing along, at this point Amazon is going to charge your credit card for machine time spent. First we launch our EC2 cluster with the following command:

1
2
3
4
5
6
7
8
sujit@sirocco:~/src/jtmt$ hadoop-ec2 launch-cluster sujit 4
Testing for existing master in group: sujit
...
Adding sujit node(s) to cluster group sujit with AMI ami-fe37d397
i-21ebda48
i-23ebda4a
i-25ebda4c
i-27ebda4e

Next we login to our master node. We will run our jobs from the command line on the EC2 master node.

1
2
3
sujit@sirocco:~/src/jtmt$ hadoop-ec2 login sujit
...
[root@domU ~]# 

We then copy over our jar file and input file to the EC2 master node. Our input file is the output of OfflineSentenceWriter, and contains one sentence per line. If you source the hadoop-ec2-env.sh file, you will get access to the environment variable SSH_OPTS, which is convenient. So...

1
2
3
4
5
sujit@sirocco:~/src/jtmt$ . $HADOOP_HOME/src/contrib/ec2/bin/hadoop-ec2-env.sh 
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS target/phrase-extractor.jar \
  root@ec2-123-456-789-01.compute-1.amazonaws.com:/root
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS books.txt \
  root@ec2-123-456-789-01.compute-1.amazonaws.com:/root

On the EC2 master node, we create a HDFS directory and put the input file into it. You can verify that the file got written using bin/hadoop dfs -lsr /.

1
2
3
[root@domU ~]# cd /usr/local/hadoop-0.18.1/
[root@domU hadoop-0.18.1]# bin/hadoop fs -mkdir /usr/root/inputs
[root@domU hadoop-0.18.1]# bin/hadoop dfs -put ~/books.txt /usr/root/inputs

When I first ran the job, I got back compressed files as the output of my reduce step. Because I didn't want to do the extra step that is mentioned here, I changed the configuration (in conf/hadoop-site.xml) to output without compression, and reran my job.

1
2
3
4
<property>
  <name>mapred.output.compress</name>
  <value>false</value> <!-- was "true" -->
</property>

Here is the command to run the Hadoop job.

1
2
3
[root@domU hadoop-0.18.1]# bin/hadoop jar /root/phrase-extractor.jar \
  hdfs://domU.compute-1.internal:50001/usr/root/inputs/books.txt \
  hdfs://domU.compute-1.internal:50001/usr/root/outputs

While the code runs, you can also monitor the job through a web interface on port 50030 on the master node. Here are some screenshots.

The job dropped two part-nnnnn files in HDFS in the output subdirectory. I first copied these back to the regular file system on the master node.

1
2
[root@domU ~]# bin/hadoop dfs -get /usr/root/outputs/part-00000 ~/part-00000
...

then back to my local box using scp.

1
sujit@sirocco:~/src/jtmt$ scp $SSH_OPTS root@ec2-123-456-789-01.compute-1.amazonaws.com:/root/part-* .

Once done, the cluster can be terminated with this command. At that point, you will exit the Amazon EC2 paid area.

1
2
3
4
5
6
7
8
sujit@sirocco:~/src/jtmt$ hadoop-ec2 terminate-cluster sujit
...
Terminate all instances? [yes or no]: yes
INSTANCE i-d7e8d9be running shutting-down
INSTANCE i-21ebda48 running shutting-down
INSTANCE i-23ebda4a running shutting-down
INSTANCE i-25ebda4c running shutting-down
INSTANCE i-27ebda4e running shutting-down

The part-nnnn files are not sorted by aggregated count, and contain more information than I need. I guess the correct approach is to run another MapReduce to filter and sort the data, but now that the files are not too large, you can just use some Unix command line tools to do this:

1
2
3
4
sujit@sirocco:~/src/jtmt$ cat part-00000 part-00001 | \
awk -F"\t" '{if ($2 != 1) print $0}' | \
sed -e 's/\t/:/' | \
sort -n -r -t ':' -k2 - > sorted

Which returns the expected results (sort of). I realize now that perhaps doing 2-grams to find phrases was a bit ambitous and I should have considered 3 to 5 grams only. If I look only at 3-grams, I find quite a few good phrases such as "as much as", etc.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
of the:2201
in the:1426
to the:892
it was:587
and the:571
it is:562
at the:532
to be:482
from the:478
on the:452
...

The code runs pretty quickly on my local machine and runs even quicker on the EC2 cluster, so I probably did not need to run this on EC2, and in that sense is a waste of money. However, my main aim with this exercise was to set myself up on Amazon EC2 for future processing, so in that sense the expense was justified. I hope you found it useful.

Update 2009-07-26: I fixed the bug in the n-gram generation that Yuval pointed out in the comments below, and reran the job with 3-5 grams this time. I get slightly better results, as shown below:

1
2
3
4
5
6
7
8
one of the:121
the sperm whale:83
out of the:82
it was a:79
it is a:77
the white whale:73
of the whale:68
there was a:64

These look a bit more like common phrases that can occur in the body of text selected. The whale references are from Moby Dick, which probably outweighs the other two books in volume.