Saturday, May 16, 2009

Parallel Application with Blitz JavaSpaces

Introduction

Late last year, I started looking at the Actor Framework (you can read about my experiences here, here, here and here), to figure out how to write clean multi-threaded applications without too much effort. During that time, I also read about JavaSpaces, a Java™ technology based on the LINDA programming model, which allows a developer to distribute tasks across multiple JVMs in a cluster.

I've been meaning to look at JavaSpaces for a while now, but I've been procrastinating because of its unfamiliarity. But recently I came across an embarassingly parallel problem for which it seemed to be a perfect fit (based on my limited knowledge), so I decided to try it out with my toy example from the Actor postings.

My Toy Example on JavaSpaces

To quickly recap, my toy example consists of a pipeline of three tasks, the first of which downloads a page from a remote site given its URL, the second indexes the page into an in-memory data structure, and the third writes the index data structure out to disk. Of these, the download task is (expected to be) network intensive and the index task is (expected to be) CPU and memory intensive, and both take much longer to finish than the write task. So the idea is to parallelize the first two tasks across multiple downloaders and indexers, and then gather the results back into a single writer. It would look something like this (using JavaSpaces).

Which Implementation?

The 800-pound gorilla in this space (no pun intended) is GigaSpaces. Their product is the XAP (eXtreme Application Platform), and it provides a simplified API using Spring and Annotations. It also comes with a set of custom Maven2 Mojos and archetypes to get you developing quickly, and enormous amounts of publicly available documentation. However, their (free) community version is really a crippled (single-node) eval good for about 45 days. Nothing wrong with that, of course, its their product, they can do what they want with it.

The other implementation I looked at was Blitz, an open source project (mostly) written and maintained by Dan Creswell. It has a few examples in the download, which worked fine for me, since the JavaSpaces API is not that hard. I was also quite impressed with the support - I had a few problems during installation, and after a few hours trying to figure it out myself, I asked on the Blitz users newsgroup, and Dan emailed me back with the solution (I had mistyped something) within the hour.

There is also Outrigger, Sun's reference implementation for JavaSpaces that comes bundled with the Jini starter kit, but I didn't try it, so I don't know enough to say anything about it.

I ended up using Blitz.

Blitz Installation

One of the things that caused me to put this stuff off was the multi-step installation and startup processes for the JavaSpaces server components, described in various articles I read earlier. However, Blitz 2.1 has a all-in-one SWT-based installer which will install Blitz, its Jini dependencies, and also configure the installation based on your JDK version.

After the installation, I tried running the helloworld example (instructions available on the site here) to test the installation, and turned out that I needed to enable Multicast on my loopback interface for the JavaSpaces server to be found. After that, the example code ran fine, and I was ready to build my application.

Code

The code I show here is based heavily on the example code in the Blitz distribution, some postings on the Blitz mailing list, and some other resources (listed below under References). However, it has some extra features (that are not in the examples), because I felt they were essential. They are:

  • Supports multiple stages - the Hello World example is a single stage application, ie one feeder and one processing unit. In my example, there are multiple phases - does not introduce huge complexity, but is probably closer to real-world scenarios.
  • Supports Transactions - in production systems, exceptions are common, so must be dealt with - JavaSpaces provides transactions that applications can use to put back entries into the space if it got an exception when dealing with it.
  • Supports High/Low Watermarking - generally, the whole point of parallelizing a process is that the producer can submit jobs faster than the consumer can handle it, so we want to increase the number of consumers. Depending on the size of your Entry object and the size of the VM of your JavaSpace, you can run out of memory on it if the producer is faster than the consumer. I use the technique of high/low watermarking as described here to deal with that.

Additions to the POM

JavaSpaces servers such as Blitz implement the JINI API, so the only two JAR files that need to be added are the jsk-lib.jar and jsk-platform.jar files. I installed them manually into my local Maven repository from the jini2_1/lib directory of the blitz distribution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<project ...>
  ...
  <dependencies>
    ...
    <dependency>
      <groupId>com.sun.jini</groupId>
      <artifactId>jsk-lib</artifactId>
      <version>2.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.sun.jini</groupId>
      <artifactId>jsk-platform</artifactId>
      <version>2.1</version>
      <scope>compile</scope>
    </dependency>
    ...
  </dependencies>
  ...
</project>

AbstractProcessor.java

In keeping with the advice on JavaSpaces patterns here, most of the JavaSpaces heavy lifting is done using Lookup.java, which I copied from the examples/helloworld directory of the Blitz distribution. I am not going to show the code for Lookup.java, either download the distribution or find it here.

In addition, I moved the common functionality of the processing units, such as getting a JavaSpace and Transaction objects up into the abstract class shown below. It has a single abstract process() method which subclasses must implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/AbstractProcessor.java
package com.mycompany.myapp.concurrent.blitz;

import java.util.logging.Logger;

import net.jini.core.transaction.Transaction;
import net.jini.core.transaction.TransactionFactory;
import net.jini.core.transaction.server.TransactionManager;
import net.jini.space.JavaSpace;

/**
 * Abstract superclass for the processing unit. The class contains
 * common application level and JavaSpace related functionality,
 * so the subclasses can focus on the business logic. 
 */
public abstract class AbstractProcessor {

  protected Logger logger = 
    Logger.getLogger(this.getClass().getName());
  
  protected final static long DEFAULT_LEASE_TIMEOUT = 120000L; // 2 mins

  /**
   * Simple lookup, returns the default JavaSpace found. More 
   * complex lookups across multiple machines may be possible
   * using LookupLocators. This method could be modified in the
   * future to return an array of JavaSpaces.
   * @return the default JavaSpace.
   */
  protected JavaSpace getSpace() throws Exception {
    Lookup lookup = new Lookup(JavaSpace.class);
    JavaSpace space = (JavaSpace) lookup.getService();
    return space;
  }

  /**
   * Returns a Transaction object to the caller by looking it
   * up from the JavaSpaces server.
   * @return a Transaction object.
   * @throws Exception if one is thrown.
   */
  protected Transaction createTransaction() throws Exception {
    Lookup lookup = new Lookup(TransactionManager.class);
    TransactionManager txManager = 
      (TransactionManager) lookup.getService();
    Transaction.Created trc = TransactionFactory.create(
      txManager, DEFAULT_LEASE_TIMEOUT);
    return trc.transaction;
  }
  
  /**
   * Convenience method to build a Document template based on the
   * desired status.
   * @param status the DocumentStatus of the template.
   * @return the Document template for matching.
   */
  public Document getTemplate(Document.Status status) {
    Document doc = new Document();
    if (status != null) {
      doc.status = status;
    }
    return doc;
  }

  /**
   * This is the method that subclasses will override, and will contain
   * the business logic that needs to be applied to the entry.
   * @throws Exception if one is thrown.
   */
  public abstract void process() throws Exception;
}

Document.java

A JavaSpace contains implementations of the JINI Entry object. In our case, this is a simple Document bean as shown below. Note that member variables must be public - this is a JavaSpaces requirement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Document.java
package com.mycompany.myapp.concurrent.blitz;

import net.jini.core.entry.Entry;

/**
 * Represents the Document that will be transformed as it goes
 * through different processors.
 */
public class Document implements Entry {

  private static final long serialVersionUID = 54056132871976348L;

  public static enum Status {New, Downloaded, Indexed, Written};
  
  public Status status;
  public String url;
  public String contents;
  public String indexData;
  
  public Document() {
    super();
  }
  
  public String toString() {
    String statusString = "unknown";
    switch (status) {
      case New: { statusString = "new"; break; }
      case Downloaded: { statusString = "downloaded"; break; }
      case Indexed: { statusString = "indexed"; break; }
      case Written: { statusString = "written"; break; }
      default: {}
    }
    return url + " (" + statusString + ")";
  }
}

Note that my Entry object is just a container of data. My application follows the "dumb-data smart-worker" approach. Another approach is the "smart-data dumb-worker" (aka the barn-raising model), where the Entry knows how to compute itself. In that case, the processors are generic, and just call the execute method on the Entry.

Master.java

Each stage of the multi-stage process is represented by a subclass of AbstractProcessor. My initial implementation was four separate classes (one each for Feeder, Downloader, Indexer and Writer) doing the necessary take/write operations on the common JavaSpace. Later, I coalesced the Feeder and Writer into inner classes inside a single Master. This allowed the two classes to communicate with each other for the high/low watermarking feature. In addition, it made it possible to manage them from a single terminal. The code for the Master is shown below. There is a lot of try/catch loops in here to ensure some degree of fault tolerance.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Master.java
package com.mycompany.myapp.concurrent.blitz;

import java.io.FileWriter;
import java.io.PrintWriter;
import java.rmi.RMISecurityManager;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.jini.core.lease.Lease;
import net.jini.core.transaction.Transaction;
import net.jini.space.JavaSpace;

/**
 * Master process. There should be only one of these in the system.
 * This process contains two threads - a feeder thread and a writer
 * thread. The Feeder thread feeds work into the system, optionally 
 * (not implemented yet) scattering the work into multiple JavaSpaces 
 * across multiple machines. The writer thread will write the results
 * of the processing to local disk, optionally (not implemented yet) 
 * gathering all the results from multiple JavaSpaces across multiple 
 * machines. The two threads communicate with each other in order to
 * avoid overloading the JavaSpace with too many jobs.
 */
public class Master {

  private final Logger logger = Logger.getLogger(this.getClass().getName());
  
  private final static long MAX_ENTRIES_TO_FEED = 25;
  
  private final static long ENTRIES_IN_SPACE_HWM = 5; // high water mark
  private final static long ENTRIES_IN_SPACE_LWM = 2; // low water mark
  private final static long RETRY_INTERVAL_MILLIS = 5000L; // 5 seconds
  
  private boolean shouldTerminate = false;
  private long numEntriesSent = 0L;
  private long numEntriesReceived = 0L;
  
  public void scatterGather() throws Exception {
    Thread feederThread = new Thread(new Runnable() {
      public void run() {
        try {
          Feeder feeder = new Feeder();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            if (numEntriesSent > MAX_ENTRIES_TO_FEED) {
              break;
            }
            if (isSpaceAboveHighWaterMark()) {
              for (;;) {
                logger.info("Space full, pausing");
                pause(RETRY_INTERVAL_MILLIS);
                if (isSpaceBelowLowWaterMark()) {
                  break;
                }
              }
            }
            try { 
              feeder.process(); 
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Feeder process error, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
            numEntriesSent++;
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Thread writerThread = new Thread(new Runnable() {
      public void run() {
        try {
          WriteProcessor writer = new WriteProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              writer.process();
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Error processing write, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
            numEntriesReceived++;
          }
          writer.destroy();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() {
        shouldTerminate = true;
      }
    }));
    feederThread.start();
    writerThread.start();
    feederThread.join();
    writerThread.join();
  }

  private boolean isSpaceAboveHighWaterMark() {
    return (numEntriesSent - numEntriesReceived > ENTRIES_IN_SPACE_HWM);
  }

  private boolean isSpaceBelowLowWaterMark() {
    return (numEntriesSent - numEntriesReceived < ENTRIES_IN_SPACE_LWM);
  }

  public void pause(long intervalMillis) {
    try { Thread.sleep(intervalMillis); }
    catch (InterruptedException e) {
      logger.info("Pause Interrupted");
    }
  }

  /**
   * Models the feeder processing unit.
   */
  private class Feeder extends AbstractProcessor {
    
    private JavaSpace space;
    private int currentIndex = 0;
    
    public Feeder() throws Exception {
      super();
      this.space = getSpace();
    }
    
    @Override
    public void process() throws Exception {
      Document doc = new Document();
      doc.url = currentIndex + ".html";
      doc.status = Document.Status.New; // new, ready to process
      // TODO: scatter using mod(hash(url),number_of_spaces)
      logger.info("Feeding " + doc);
      space.write(doc, null, Lease.FOREVER);
      currentIndex++;
    }
  }

  /**
   * Models the Writer processing unit.
   */
  private class WriteProcessor extends AbstractProcessor {
    
    private JavaSpace space;
    private Document template;
    private PrintWriter writer;
    
    public WriteProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.Indexed);
      this.writer = new PrintWriter(new FileWriter("/tmp/docs.txt"), true);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        // other processing here, if applicable, before writing out...
        logger.info("Writing " + doc);
        doc.status = Document.Status.Written;
        writeToFile(writer, doc);
        // ... we don't write this back into the space, because
        // its the end of the line
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }

    public void destroy() {
      if (writer != null) {
        writer.flush();
        writer.close();
      }
    }
    
    private void writeToFile(PrintWriter writer, Document doc) {
      writer.println(doc.toString());
      writer.flush();
    }
  }

  /**
   * This is how we are called.
   */
  public static void main(String[] args) {
    if (System.getSecurityManager() == null) {
      System.setSecurityManager(new RMISecurityManager());
    }
    try {
      Master master = new Master();
      master.scatterGather();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

An improvement to this would be to have the Feeder distribute requests to multiple JavaSpaces on multiple machines, each with a local Slave, and have the Writer gather the results back into a single location on disk. This will make the code a bit more complicated though, and I would probably need to change the code to have the Writer register for Events raised by the JavaSpace when it finds a Document with Status.Indexed.

Slave.java

Similar to the Master, I decided to also coalesce the Downloader and Indexer into a single class in order to make it easier to manage from a single script. It is designed to be colocated the JavaSpace on the same machine. The code is shown below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Source: src/main/java/com/mycompany/myapp/concurrent/blitz/Slave.java
package com.mycompany.myapp.concurrent.blitz;

import java.rmi.RMISecurityManager;
import java.util.logging.Level;
import java.util.logging.Logger;

import net.jini.core.lease.Lease;
import net.jini.core.transaction.Transaction;
import net.jini.space.JavaSpace;

/**
 * Models a slave process. There can be many slave processes in
 * the system, each attached to a single JavaSpace. In the current
 * setup, a slave consists of 1 instance each of the Download and
 * the Index thread, but optionally (not implemented yet) can 
 * consist of a set of m Downloaders and n Indexers where (m,n)
 * are controlled by configuration, and depend on the machine on
 * which it will run. The processing units are coalesced into threads
 * to make it easier to manage - there is no direct communication 
 * between the threads.
 */
public class Slave {

  private final Logger logger = Logger.getLogger(this.getClass().getName());
  private static final long RETRY_INTERVAL_MILLIS = 5000L; // 5 seconds
  
  private boolean shouldTerminate = false;
  
  public void doWork() throws Exception {
    Thread downloadThread = new Thread(new Runnable() {
      public void run() {
        try {
          DownloadProcessor downloader = new DownloadProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              try {
                downloader.process();
              } catch (Exception e) {
                logger.log(Level.WARNING, 
                  "Download Process failed, retrying...", e);
                pause(RETRY_INTERVAL_MILLIS);
                continue;
              }
            } catch (Exception e) {
            }
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Thread indexThread = new Thread(new Runnable() {
      public void run() {
        try {
          IndexProcessor indexer = new IndexProcessor();
          for (;;) {
            if (shouldTerminate) {
              break;
            }
            try {
              indexer.process();
            } catch (Exception e) {
              logger.log(Level.WARNING, 
                "Index process failed, retrying...", e);
              pause(RETRY_INTERVAL_MILLIS);
              continue;
            }
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    });
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() {
        shouldTerminate = true;
      }
    }));
    downloadThread.start();
    indexThread.start();
    downloadThread.join();
    indexThread.join();
  }

  private void pause(long retryIntervalMills) {
    try { Thread.sleep(RETRY_INTERVAL_MILLIS); }
    catch (InterruptedException e) {
      logger.info("Pause interrupted");
    }
  }

  /**
   * Models the Download Processing Unit.
   */
  private class DownloadProcessor extends AbstractProcessor {
    
    private JavaSpace space;
    private Document template;
    
    public DownloadProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.New);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        // more processing here...
        logger.info("Downloading " + doc);
        doc.status = Document.Status.Downloaded;
        space.write(doc, tx, DEFAULT_LEASE_TIMEOUT);
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }
  }
  
  /**
   * Models the Indexing Processing Unit.
   */
  private class IndexProcessor extends AbstractProcessor {

    private JavaSpace space;
    private Document template;
    
    public IndexProcessor() throws Exception {
      super();
      this.space = getSpace();
      this.template = getTemplate(Document.Status.Downloaded);
    }
    
    @Override
    public void process() throws Exception {
      Transaction tx = null;
      try {
        tx = createTransaction();
        Document doc = (Document) space.take(template, tx, Lease.FOREVER);
        logger.info("Indexing " + doc);
        doc.status = Document.Status.Indexed;
        space.write(doc, tx, DEFAULT_LEASE_TIMEOUT);
        tx.commit();
      } catch (Exception e) {
        if (tx != null) {
          tx.abort();
        }
        throw e;
      }
    }
  }

  /**
   * This is how we are called.
   */
  public static void main(String[] args) {
    if (System.getSecurityManager() == null) {
      System.setSecurityManager(new RMISecurityManager());
    }
    try {
      Slave slave = new Slave();
      slave.doWork();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

The example shows a single instance of the Downloader and Indexer, but one possible improvement to take advantage of multiple cores would be to have multiple Download and Index threads in a single Slave instance.

runpu.sh

Finally, I call both the master and the slave using the following simple shell script.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#!/bin/bash
M2_REPO=$HOME/.m2/repository
BLITZ_HOME=/opt/blitz-2.1
CLASSPATH=\
  $M2_REPO/com/sun/jini/jsk-lib/2.1/jsk-lib-2.1.jar:\
  $M2_REPO/com/sun/jini/jsk-platform/2.1/jsk-platform-2.1.jar:\
  target/multicore-1.0-SNAPSHOT.jar 

case "$1" in
  master) java -Djava.security.policy=$BLITZ_HOME/config/policy.all \
          -classpath $CLASSPATH \
          com.mycompany.myapp.concurrent.blitz.Master 2>&1 | tee $1.log
     ;;
  slave) java -Djava.security.policy=$BLITZ_HOME/config/policy.all \
          -classpath $CLASSPATH \
          com.mycompany.myapp.concurrent.blitz.Slave 2>&1 | tee $1.log
     ;;
  *) echo "Usage: $0 {master|slave}"
     ;;
esac

Running the example

I run all the components locally on my laptop. 4 terminal windows are required.

Start Blitz Server

To do this, navigate to your blitz installation directory - mine is /opt/blitz-2.1, and start blitz using the supplied shell script. If you are testing your code, it is probably better to start it in transient mode, this will require you to replace PersistentMode (the default) with Transient in config/blitz.config.

1
2
3
sujit@sirocco:~$ cd /opt/blitz-2.1
sujit@sirocco:~$ ./blitz.sh
...you will see blitz starting up here

Start Blitz Dashboard

The Blitz dashboard is a simple GUI tool that tells you the server's memory footprint, number of active entries, operations, transactions, etc. It is useful to monitor the progress of your job.

1
2
3
sujit@sirocco:~$ cd /opt/blitz-2.1
sujit@sirocco:~$ ./dashboard.sh
...you will see logging to console and the GUI come up

Start Slave

We want to start the slave first because the slave will block for input from the master. To do this:

1
2
3
4
sujit@sirocco:~$ cd /apps/multicore
sujit@sirocco:~$ ./runpu.sh slave
...will output to console once master submits jobs to the space
...terminate with ^C

Start Master

Starting the master is similar to starting the slave, only a different parameter.

1
2
3
4
sujit@sirocco:~$ cd /apps/multicore
sujit@sirocco:~$ ./runpu.sh master
...you will see the master feeding and collecting Document objects.
...terminate with ^C

A partial master log is shown below. As you can see, it pauses when the space gets too full.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
May 10, 2009 5:46:13 PM Master$Feeder process INFO: Feeding 0.html (new)
May 10, 2009 5:46:13 PM Master$Feeder process INFO: Feeding 1.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 2.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 3.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 4.html (new)
May 10, 2009 5:46:14 PM Master$Feeder process INFO: Feeding 5.html (new)
May 10, 2009 5:46:14 PM Master$1 run INFO: Space full, pausing
May 10, 2009 5:46:14 PM Master$WriteProcessor process INFO: Writing 0.html (indexed)
May 10, 2009 5:46:14 PM Master$WriteProcessor process INFO: Writing 5.html (indexed)
...

Here is a partial snip of the slave log - this is probably less interesting, it simply reports on the progress as the Document is processed by the processors. One important thing to note here is that order may not be maintained, which is okay in my case.

1
2
3
4
5
6
7
8
9
May 10, 2009 5:46:14 PM Slave$DownloadProcessor process INFO: Downloading
0.html (new)
May 10, 2009 5:46:14 PM Slave$IndexProcessor process INFO: Indexing 0.html
(downloaded)
May 10, 2009 5:46:14 PM Slave$DownloadProcessor process INFO: Downloading
5.html (new)
May 10, 2009 5:46:14 PM Slave$IndexProcessor process INFO: Indexing 5.html
(downloaded)
...

Open Issues

I noticed that I was getting an Out of Memory Exception when I increase the number of tasks fed in by the Feeder. It looks like it could be a problem with the number of threads per process setting on my Linux distribution, and involves recompiling glibc and the kernel. The last time I saw this was in the Java 1.2 days, probably because that was the last time I used native threads from within my Java code. JINI does use native threads too, so that may be the reason. I will post updates as I know more about this problem.

Conclusion

I think JavaSpaces is a good strategy to consider if you have to solve problems that can be broken up into relatively independent stages, some of which require lots of resources and some that don't. Unlike MapReduce, where the hardest part is to figure out how to fit your application into the framework, and usually requires algorithm changes and application redesign, partitioning an application with JavaSpaces is much easier and less invasive.

The other thing I had was my perception of the difficulty of setting up a JavaSpaces server, but that turned out to be a non-issue. Of course, the example I posted is fairly simple, and its possible that I will hit issues with a "real" application, but I guess I will cross that bridge when I get to it.

References

JavaSpaces is relatively less popular, compared to, say Servlets or JDBC, so information on the Internet is a bit harder to find. I found a few sites which helped me quite a bit, so I am including them here:

2 comments (moderated to prevent spam):

PetrolHead said...

Good article and thank you for trying out Blitz.

Wouldn't mind hearing more about the OOM problem's you've been encountering....

Best,

Dan.

Sujit Pal said...

Thanks Dan. I have posted more details about the OOME I was seeing on the blitz user list, as well as some more info that I found. I've been dragging my feet on building a new kernel and recompiling glibc, hoping that the fix would be something less drastic :-).