Saturday, March 21, 2009

Cartesian Joining Flat Files with Python Generators

I've been noticing that whenever I build something that does an interesting transformation (can't go into more detail than that, sorry) to some peice(s) of data, I inevitably get requests to do a similar transformation on data from a completely different source. The inputs are provided in the form of large CSV text dumps from database tables, and almost always is in formats other than what my original program expects, since it contains key identification data to allow the output to be uploaded back to the database.

While I am not complaining (it's definitely good to have one's work be considered useful by one's colleagues), it becomes tedious to write wrappers that take these different formats, extract the information I need to process, and writes back the output, preserving the key identification data to write back into the output. So for quite a while now, I have been toying with the idea of building a wrapper that allows me to do SQL like transformations on flat files.

In this post, I show some Python code to do cartesian joins to denormalize an arbitary number of flat files into a single one. Conceptually, what I am after is to do something along these lines:

1
2
3
4
select *
from table_a a, table_b b, table_c c, ...
where a.col1 = 'foo'
and b.col2 = 'bar'

The idea is to denormalize the tables into one flat file. I could do it fairly simply by nesting for loops, something like this (pseudo-code):

1
2
3
4
for row_a in table_a:
  for row_b in table_b:
    for row_c in table_c:
      sepchar.join([row_a, row_b, row_c])

However, I don't know the number of flat files I will be denormalizing until runtime, so the above approach won't work.

Another issue is the size of the files. They are expected to be large enough so sucking then into in-memory lists is not practical. Python supports Generators, which allows you to iterate through a file and return a line to a caller, without the caller having to maintain the state and restoring it on the next call (the yield keyword). David Beazly has a nice presentation (PDF) along with some code samples that explains this in more detail. So I built my own FileIterator object that produces an infinite generator on the flat files (ie, if it reaches EOF, it reopens the file and goes back to returning lines).

Here is the code. This Python recipe has some code which looks like it does something similar, but I ended up rolling my own because my needs were slightly different.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/python
# Computes a projection over an arbitary number of comma-separated
# text files.

class FileIterator():
  """
  A generator which recurses infinitely through a flat file. If EOF
  is reached, then it reopens the file and starts again from the
  beginning. All generators but the first one will be closed and
  reopened. The first one will terminate on EOF because otherwise
  this program will run in an infinite loop.
  """
  def __init__(self, filename):
    self.filename = filename
    self.infile = open(filename, 'rb')

  def nextLine(self):
    while (True):
      try:
        line = self.infile.readline()[:-1]
      except ValueError:
        self.infile = open(self.filename, 'rb')
        continue
      if (line == ''):
        break
      yield line
    self.infile.close()


def project(filenames):
  """
  Delegates to the recursive _project method (see last line)
  """
  def _project(fileIterators, sepchar="|", fline=None):
    """
    Private recursive method to iterate through the generator outputs
    in reverse and build up the final results.
    """
    last = fileIterators[-1]
    for line in last.nextLine():
      if (len(fileIterators) == 1):
        print sepchar.join([line, fline])
      else:
        rest = fileIterators[:-1]
        if (fline is None):
          _project(rest, sepchar, line)
        else:
          _project(rest, sepchar, sepchar.join([line, fline]))
  _project(map(lambda f: FileIterator(f), filenames))

def main():
  """
  Test code
  """
  filenames = ["/tmp/aa1", "/tmp/aa2", "/tmp/aa3", "/tmp/aa4"]
  project(filenames)

if __name__ == "__main__":
  main()

The main workhorse in here is the project function which delegates to a private _project function that does the actual recursion.

Here are my input files that I used for testing, and some lines from the output file, that will help you to figure out how the program works.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
file: /tmp/aa1
a1
a2
a3
file: /tmp/aa2
b1
b2
b3
file: /tmp/aa3
c1
c2
c3
file: /tmp/aa4
d1
d2

Which produces the following output:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
a1|b1|c1|d1
a2|b1|c1|d1
a3|b1|c1|d1
a1|b2|c1|d1
a2|b2|c1|d1
a3|b2|c1|d1
a1|b3|c1|d1
a2|b3|c1|d1
a3|b3|c1|d1
...

As you can see, the last file in the array is opened first, and used as a pivot for the previous file, and so on, until there are no more files left, at which point, the joined line is printed out.

No comments:

Post a Comment

Comments are moderated to prevent spam.