Saturday, August 14, 2010

A Recipe for Parallelization with Actors and JMS

Some time back, I spent some time researching various Actor Frameworks (you can find the posts here, here, here and here). While it was interesting, it was mostly academic for me, since they all implemented the Actor pattern in concurrent environments, ie, using threads in a single JVM, and I don't have access to large multi-core machines that these frameworks seemed to be aimed at.

Almost a year and a half later, a comment on one of these posts caused me to reread what I had written, and I realized that it may be possible to implement a parallel solution (distributing a large job across multiple low/medium powered machines) using the Actor pattern and JMS. As proof of concept, I decided to implement my little example of the 3-task pipeline (Download, Index and Write) that I had used for the other Actor examples.

Of course, since there does not exist a framework (none that I know of anyway) that does this, I built my own. Its called Kabuki, a Japanese theaterical art-form. Code wise, it is basically 1 abstract class with 2 inner classes that takes care of the JMS aspects, and exposes hooks (in the form of abstract methods) that an Actor subclass must implement.

For those of you who are not too familiar with what Actors (as defined in the Actor model) are, here is a nice definition from Concurrency in Erlang & Scala: The Actor Model by Ruben Vermeersch.

In the actor model, each object is an actor. This is an entity that has a mailbox and a behaviour. Messages can be exchanged between actors, which will be buffered in the mailbox. Upon receiving a message, the behaviour of the actor is executed, upon which the actor can: send a number of messages to other actors, create a number of actors and assume new behaviour for the next message to be received.

Like the actor defined above, Kabuki Actor subclasses define their own behavior (ie, the transformation on the incoming message to produce an outgoing message), and specify the location of a JMS queue that is to serve as its Inbox. Unlike it, however, the Actor does not decide (at runtime) where to send the outgoing message. Kabuki Actors have to specify at startup the location of a JMS queue to serve as its Outbox (if needed). The JMS queues are exposed by a JMS broker, Apache ActiveMQ in my case.

Kabuki Actor API

The Kabuki Actor superclass has 4 abstract methods which a subclass needs to implement. Sometimes these can be empty implementations. The table below lists these methods and a brief description.

Method-Name Description
type()::Actor.Type Kabuki Actors can be WRITE_ONLY, READ_WRITE or READ_ONLY. Determines whether the Actor only writes to the Outbox, reads from Inbox and writes to Outbox, and reads from Inbox respectively. All Actors must have an Inbox reference, but READ_ONLY Actors don't need an Outbox reference.
init()::void Any application specific initialization, such as grabbing handles to external resources such as databases, files, etc. This is called once during the lifetime of the Kabuki Actor, when it is starting up.
perform(I input)::O Implements the Kabuki Actor's behavior. Consumes an object of type I and transforms it into an object of type O. Called once for each message placed in the Actor's Inbox.
destroy()::void Any application specific cleanup, such as flushing or releasing handles to external resources. Called once during the lifetime of the Kabuki Actor, when it is shutting down.

The pipeline of Actors for my example looks something like this:

In the example above, the Download Actor is the initiator (a WRITE_ONLY Actor). Although it has an Inbox, the only time it is ever used is when it calls the shutdown() method on itself. The Download Actor (a READ_WRITE Actor) writes to the Inbox of the Index Actor, and the Index Actor writes to the Inbox of the Write Actor (a READ_ONLY Actor). The Write Actor produces the output of the parallel job, and doesn't have an Outbox.

In our example, the Index Actor is the bottleneck, so we can alleviate the problem by starting up multiple instances of it. Since both Index Actors share the Inbox (a Queue exposed by the JMS broker), messages written by the Download Actor can be consumed by one of the Index Actors in the tier.

To illustrate how simple it is to actually implement Kabuki Actors, I show below the code for the three Actors in the chain. All they do is modify a String generated by the DownloadActor as it passes through the chain, but obviously you can make them do whatever you want them to.

DownloadActor.java

The DownloadActor generates the data. Obviously, you can dispense with this Actor and use some kind of Master process to pump data into the Actor pipeline. However, in this case, the JMS abstraction leaks a bit, since you (the programmer) is now forced to deal with the JMS message format. The DownloadActor demonstrates how to use a Kabuki Actor to generate the data - as you can see, the perform() method does not use its input parameter - it simply generates the data in a for loop. In a real world scenario, it would perhaps connect to an external data source and run some kind of filter on it periodically and pump the filtered data through.

The Download Actor (or any WRITE_ONLY Actor) is the only one in the pipeline that is allowed to call send(). Other Actors will do this function internally (in the superclass).

In lots of situations, Actors will run forever, but in our case, we want to terminate all the Actors in the pipeline once it pumps the data through. Once again, in something unique to WRITE_ONLY Actors, it calls shutdown(), which pumps a command through the pipeline that causes the other Actors to terminate as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Source: src/main/java/com/mycompany/kabuki/actors/DownloadActor.java
package com.mycompany.kabuki.actors;

import com.mycompany.kabuki.core.Actor;

public class DownloadActor extends Actor<String,String> {

  @Override public Type type() {
    return Type.WRITE_ONLY;
  }

  @Override public void init() throws Exception {
    perform(null);
  }

  @Override public void destroy() throws Exception { /* NOOP */ }

  @Override public String perform(String input) throws Exception {
    for (int i = 0; i < 10; i++) {
      input = "Download Document-#:" + i;
      logger.info(input);
      send(input);
    }
    shutdown();
    return null;
  }
}

IndexActor.java

The IndexActor is a READ_WRITE Actor. It reads input messages off its Inbox, processes it using its perform() method, then places output messages on the Inbox of the next Actor in the pipeline.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/java/com/mycompany/kabuki/actors/IndexActor.java
package com.mycompany.kabuki.actors;

import org.apache.commons.lang.StringUtils;

import com.mycompany.kabuki.core.Actor;

public class IndexActor extends Actor<String,String> {

  @Override public Type type() {
    return Type.READ_WRITE;
  }

  @Override public void init() throws Exception { /* NOOP */ }

  @Override public void destroy() throws Exception { /* NOOP */ }

  @Override public String perform(String input) {
    String output = StringUtils.replace(input, "Download", "Index");
    logger.info(output);
    return output;
  }
}

WriteActor.java

The WriteActor is the final step in my pipeline. Since its function is to write out the text file containing the value of the converted String, it needs to open and close the handle to the file - this is done in the init() and destroy() methods. The other Actors in my chain did not need to do this, so their init() and destroy() methods are empty.

The WriteActor also does not forward the converted data to the next Actor in the pipeline, because there is no other Actor.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Source: src/main/java/com/mycompany/kabuki/actors/WriteActor.java
package com.mycompany.kabuki.actors;

import java.io.FileWriter;
import java.io.PrintWriter;

import org.apache.commons.lang.StringUtils;

import com.mycompany.kabuki.core.Actor;

public class WriteActor extends Actor<String,String> {

  PrintWriter printWriter;
  
  @Override public Type type() {
    return Type.READ_ONLY;
  }

  @Override public void init() throws Exception {
    printWriter = new PrintWriter(new FileWriter("/tmp/demo.txt"), true);
  }

  @Override public void destroy() throws Exception {
    if (printWriter != null) {
      printWriter.flush();
      printWriter.close();
    }
  }

  @Override public String perform(String input) {
    String output = StringUtils.replace(input, "Index", "Write");
    logger.info(output);
    printWriter.println(output);
    return null;
  }
}

Command line syntax

The pipeline can be set up as a shell script. Currently my script runs all the Actors on the same box, but we can use passwordless SSH to start up Actors on different machines. Here is my script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash
PROJECT_HOME=/Users/sujit/Projects/kabuki
CLASSPATH=\
  $PROJECT_HOME/lib/commons-lang-2.5.jar:\
  $PROJECT_HOME/lib/commons-cli-1.0.jar:\
  $PROJECT_HOME/lib/activemq-all-5.3.2.jar:\
  $PROJECT_HOME/lib/log4j-1.2.14.jar:\
  $PROJECT_HOME/target/kabuki.jar
BROKER_URL=tcp://localhost:61616
LOG4J_CONFIG=file:$PROJECT_HOME/src/main/resources/log4j.properties
# single instance of DownloadActor
java -cp $CLASSPATH -Dlog4j.configuration=$LOG4J_CONFIG \
  com.mycompany.kabuki.core.Actor \
  -a com.mycompany.kabuki.actors.DownloadActor \
  -u $BROKER_URL -i download -o index &
# multiple parallel instances of IndexActor
java -cp $CLASSPATH -Dlog4j.configuration=$LOG4J_CONFIG \
  com.mycompany.kabuki.core.Actor \
  -a com.mycompany.kabuki.actors.IndexActor \
  -u $BROKER_URL -i index -o write &
java -cp $CLASSPATH -Dlog4j.configuration=$LOG4J_CONFIG \
  com.mycompany.kabuki.core.Actor \
  -a com.mycompany.kabuki.actors.IndexActor \
  -u $BROKER_URL -i index -o write &
# single instance of WriteActor
java -cp $CLASSPATH -Dlog4j.configuration=$LOG4J_CONFIG \
  com.mycompany.kabuki.core.Actor \
  -a com.mycompany.kabuki.actors.WriteActor \
  -u $BROKER_URL -i write &
wait

As you can see, you assign the inbox and outbox to different Actors by using the -i and -o parameters. The script assumes that the JMS broker has been set up. For the JMS Broker, I used a stock Apache ActiveMQ - just downloaded it and started it with the default configuration.

Kabuki Internals

As you know, the Kabuki "framework" consists of a single Actor class that all Kabuki Actors must extend. Here is the code for the Actor superclass:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
// Source: src/main/java/com/mycompany/kabuki/core/Actor.java
package com.mycompany.kabuki.core;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

/**
 * Superclass that all Actors implementations must extend.
 * @author Sujit Pal (spal@healthline.com)
 * @version $Revision$
 */
public abstract class Actor<I,O> {

  public static enum Type {
    READ_ONLY, WRITE_ONLY, READ_WRITE
  };
  
  public static final String SHUTDOWN_COMMAND = "SHUTDOWN";

  private static final String PAYLOAD_KEY = "payload";
  private static final String COMMAND_KEY = "command";
  private static final long SHUTDOWN_DELAY = 5000; // 5s
  
  protected final Logger logger = Logger.getLogger(getClass());
  
  private String brokerUrl;
  private String inbox;
  private String outbox;
  
  private Connection connection;
  // payload is p2p - since we want only one actor at a tier to pick
  // up the incoming payload
  private Session payloadSession;
  private Queue payloadInbox;
  private Queue payloadOutbox;
  private MessageConsumer payloadReceiver;
  private MessageProducer payloadSender;
  // command is pub-sub, since we want all actors in a tier to pick
  // up the incoming command
  private Session commandSession;
  private Topic commandTopic;
  private MessageConsumer commandSubscriber;
  private MessageProducer commandPublisher;
  
  public abstract Type type();
  public abstract void init() throws Exception;
  public abstract O perform(I input) throws Exception;
  public abstract void destroy() throws Exception;

  public void setBrokerUrl(String brokerUrl) {
    this.brokerUrl = brokerUrl;
  }
  
  public void setInbox(String inbox) {
    this.inbox = inbox;
  }
  
  public void setOutbox(String outbox) {
    this.outbox = outbox;
  }
  
  protected final void start() throws Exception {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
    connection = factory.createConnection();
    commandSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    payloadSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // set up payload queue destinations and friends
    payloadInbox = payloadSession.createQueue(
      StringUtils.join(new String[] {PAYLOAD_KEY, inbox}, "."));
    payloadReceiver = payloadSession.createConsumer(payloadInbox);
    payloadReceiver.setMessageListener(new PayloadListener());
    if (type() == Type.WRITE_ONLY || type() == Type.READ_WRITE) {
      payloadOutbox = payloadSession.createQueue(
        StringUtils.join(new String[] {PAYLOAD_KEY, outbox}, "."));
      payloadSender = payloadSession.createProducer(payloadOutbox);
    }
    // set up command topic destination and friends
    commandTopic = commandSession.createTopic(
      StringUtils.join(new String[] {
      COMMAND_KEY, getClass().getSimpleName()}, ".")); 
    commandSubscriber = commandSession.createConsumer(commandTopic);
    commandSubscriber.setMessageListener(new CommandListener());
    commandPublisher = commandSession.createProducer(commandTopic);
    // start your engine
    connection.start();
    logger.info("Actor Started: " + getClass().getName());
    // call the init() hook
    init();
  }

  protected void shutdown() throws Exception {
    if (payloadOutbox != null) {
      MapMessage shutdownMessage = payloadSession.createMapMessage();
      shutdownMessage.setString(COMMAND_KEY, SHUTDOWN_COMMAND);
      payloadSender.send(shutdownMessage);
    }
    if (type() == Type.WRITE_ONLY) {
      logger.info("Waiting " + SHUTDOWN_DELAY + "ms...");
      try { Thread.sleep(SHUTDOWN_DELAY); }
      catch (InterruptedException e) { /* NOOP */ }
      stop();
    }
  }

  protected final void send(O output) throws Exception {
    if (type() != Type.WRITE_ONLY) {
      throw new IllegalAccessError(
        "send() can only be called from WRITE_ONLY Actors");
    }
    if (payloadSender != null) {
      MapMessage payload = payloadSession.createMapMessage();
      payload.setObject(PAYLOAD_KEY, output);
      payloadSender.send(payload);
    }
  }

  private final void stop() throws Exception {
    closeQuietly(payloadReceiver);
    closeQuietly(commandSubscriber);
    closeQuietly(payloadSender);
    closeQuietly(commandPublisher);
    commandSession.close();
    payloadSession.close();
    connection.close();
    // call the destroy hook
    destroy();
    logger.info("Actor stopped: " + getClass().getName());
  }

  private void closeQuietly(MessageProducer producer) {
    if (producer != null) {
      try { producer.close(); }
      catch (JMSException e) { /* NOOP */ }
    }
  }
  
  private void closeQuietly(MessageConsumer consumer) {
    if (consumer != null) {
      try { consumer.close(); }
      catch (JMSException e) { /* NOOP */ }
    }
  }
  
  private final class PayloadListener implements MessageListener {
    @SuppressWarnings("unchecked")
    @Override public void onMessage(Message message) {
      try {
        MapMessage payload = (MapMessage) message;
        // check to see if this is a shutdown message. If so, send to
        // the command topic for this actor 
        String command = (String) payload.getObject(COMMAND_KEY);
        if (SHUTDOWN_COMMAND.equals(command)) {
          // send the shutdown command to the command topic
          TextMessage shutdownMessage = commandSession.createTextMessage();
          shutdownMessage.setText(SHUTDOWN_COMMAND);
          commandPublisher.send(shutdownMessage);
          // pass it on to the next tier
          shutdown();
        } else {
          I input = (I) payload.getObject(PAYLOAD_KEY);
          O output = null;
          if (input != null) {
            try { 
              output = perform(input);
            } catch (Exception e) {
              logger.error(e);
            }
          }
          if (output != null && payloadSender != null) {
            MapMessage outputPayload = payloadSession.createMapMessage();
            outputPayload.setObject(PAYLOAD_KEY, output);
            payloadSender.send(outputPayload);
          }
        }
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  };

  private final class CommandListener implements MessageListener {
    @Override public void onMessage(Message message) {
      try {
        TextMessage command = (TextMessage) message;
        if (SHUTDOWN_COMMAND.equals(command.getText())) {
          stop();
        }
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  };

  // these methods are called from the shell script to start up an instance
  // of an Actor.
  
  @SuppressWarnings("unchecked")
  public static void main(String[] args) throws Exception {
    CommandLineParser clp = new BasicParser();
    Options options = new Options();
    options.addOption("h", "help", false, "Print this message");
    options.addOption("a", "actor", true, "Full class name for Actor");
    options.addOption("i", "input", true, "Inbox");
    options.addOption("o", "output", true, "Outbox (optional for READ_ONLY)");
    options.addOption("u", "brokerUrl", true, "ActiveMQ Broker URL");
    CommandLine cl = clp.parse(options, args);
    if (cl.hasOption("h")) {
      printUsage(null, options);
      System.exit(0);
    }
    String actorClassName = null;
    if (cl.hasOption("a")) {
      actorClassName = cl.getOptionValue("a");
    }
    String brokerUrl = null;
    if (cl.hasOption("u")) {
      brokerUrl = cl.getOptionValue("u");
    }
    String inputAlias = null;
    if (cl.hasOption("i")) {
      inputAlias = cl.getOptionValue("i");
    }
    String outputAlias = null;
    if (cl.hasOption("o")) {
      outputAlias = cl.getOptionValue("o");
    }
    // validation: broker url must be defined
    if (StringUtils.isEmpty(brokerUrl)) {
      printUsage("Broker URL must be defined", options);
    }
    // validation: actor class name must be defined
    if (StringUtils.isEmpty(brokerUrl)) {
      printUsage("Actor class must be defined", options);
    }
    Actor actor = 
      (Actor) Class.forName(actorClassName).newInstance();
    // validation: 
    // all actors must have a payload inbox
    // only read-only actors may or may not have a payload outbox
    if (StringUtils.isEmpty(inputAlias)) {
      printUsage("No Inbox specified for actor:" + actorClassName, options);
    }
    if (StringUtils.isEmpty(outputAlias)) {
      if (actor.type() != Type.READ_ONLY) {
        printUsage("No Outbox specified for actor:" + actorClassName, options);
      }
    }
    actor.setBrokerUrl(brokerUrl);
    actor.setInbox(inputAlias);
    actor.setOutbox(outputAlias);
    actor.start();
  }

  private static void printUsage(String message, Options options) {
    if (StringUtils.isNotEmpty(message)) {
      System.out.println("ERROR: " + message);
    }
    HelpFormatter formatter = new HelpFormatter();
    formatter.defaultWidth = 80;
    formatter.printHelp("java " + Actor.class.getName() + 
      " [-h|-a class -u url -i input [-o output]]", options);
  }
}

Internally, each Actor sets up two sessions, one for publish-subscribe messaging and another for point-to-point messaging. Both sessions have asynchronouse Listeners (MessageListeners) which listen for messages on the incoming Queue or Topic until the sessions are closed.

The point-to-point session is used by the Actor to read incoming messages off the Inbox Queue and write the output of perform() onto its Outbox Queue. Point-to-point ensures that a mesage is processed by an Actor only once.

The publish-subscribe session is used for sending the SHUTDOWN command - if there are multiple instances of an Actor at a particular tier in the pipeline, then we want a single SHUTDOWN command to percolate to all the Actors in the tier. This is achieved by setting up a single command Topic per tier (using the simple class name of the actor as part of the Topic name) and having all the Actors in this tier subscribe to the Topic.

Ordering (ie, the SHUTDOWN should be processed after all payload is consumed) is achieved by sending the SHUTDOWN command in the same format as the payload (ie a MapMessage). The payload listener checks to see if the message is a SHUTDOWN message, and if so, writes out a message to the command Topic, which is then consumed by the command listener on every Actor in that tier, causing them to terminate.

Conclusion

I haven't tried this out with a real world scenario yet, but my demo seem to work fine, which is encouraging. While I have used JMS before and also played with various Actor framework flavors, I am by no means an authority on either subject. If you are, or if you notice flaws in my design, would appreciate your letting me know.

6 comments (moderated to prevent spam):

Anonymous said...

You should look at Akka remote Actors
http://akkasource.org/

Sujit Pal said...

Thanks for the link... from a preliminary look at Akka, it seems it does what I want and much more. Definitely going to look at it further.

Anonymous said...

Maybe this is what you are looking for:
http://capsulefw.sourceforge.net

Sujit Pal said...

Thanks for the link, from a very preliminary read of the documentation, it seems to be lighter weight than akka, but built around similar concepts.

Anonymous said...

Quick one,

I am a bit confused as it is the first time I see a messageListener being used without the assistance of a EJBContainer or other lightweight alternatives (for example: SimpleMessageListenerContainer from Spring).

Could you tell me why it works? and why does spring need these lightweight containers then?

Cheers and congrats for the post!

Sujit Pal said...

Hi, thanks. To answer your question of why it works...well, MessageListener is part of the JMS spec (and implementations) and can be used standalone. Don't know too much about EJB containers, but with Spring, usually they provide wrappers in order that the wrapper beans can be easily wired up as this page demonstrates.