Friday, December 19, 2008

Concurrency, Actors and Kilim

Motivation

I don't do too much multithreaded Java programming, and what I do (or have done) is quite basic. Most of them have to do with either spawning off an asynchronous thread to do some work in background, or to break up large batch processes to run in multiple threads so they can use multiple CPUs and complete faster.

Some weeks ago, I attended a presentation (organized by EBig) given by some folks from Azul Systems, where they demo-ed their 108-core (2x52) RISC chip with a JDK optimized to work with the chip. These chips are designed for running very large concurrent Java apps. Not having something large or complex enough to run on such a beast, I was pretty much not the target market, so the talk for me was in the "nice to know" category.

The day after, I attended a (free) community event hosted by QCon, where people were asking about whether Actors would be included in Java 7, since it has already been ported to Scala, which is apparently regarded as the unofficial sandbox for new features in Java. Not knowing anything about Actors, and hearing all kinds of vague but good things about how it can simplify concurrent programming, I made a mental note to look at it once I got out.

Turns out that I had quite a bit of reading to do. Its something of a recursive problem - once I learned about the Actor Framework, I wanted to take a look at Erlang. Along the way, I learned the difference between concurrent and parallel programming, and was led to LINDA and Javaspaces. I also read a few theoretical papers on concurrent and parallel programming, which outlines some very interesting design patterns (listed below in References) that can help in building "safer" concurrent systems. Finally, I decided I also wanted to brush up on the Java 5 data structures for concurrency as well, so I ended up reading Java Concurrency in Practice once again.

Also turns out that there are a bunch of really smart people building various flavors of the Actor framework in Java, which removes or postpones the necessity for (not so smart) people like me from having to learn Scala or Erlang to be able to program using Actors. The three I found are Functional Java, Kilim and Jetlang. Of the three, I liked Kilim the best, because of the simplicity of its API, although it uses bytecode enhancement to retain this simplicity, which may be a turn-off for some folks. Since I learn best by doing, I decided to code up a simple hypothetical system that made sense to me (rather than try to apply a concurrent Fibonacci generator or a PingPong example to my world). I describe this in today's post.

Background

Kilim's design is based on the message-passing concurrency approach popularized by Erlang. The application is broken up into completely independent processes (Actors), each of which have their own thread pool and which communicate with each other through messages, thus eliminating shared state, and thus eliminating problems of data corruption and deadlocks.

Below is a quote from the Side Words blog that provides a very nice and succint definition of what an Actor is and how Actor-based programming differs from Object-oriented programming.

At first sight, both may look similar. However, they have profound differences. Conceptually, objects are just “dead” data with methods. On the opposite, actors are “alive” indepedent processes with transactional memory. An actor is basically like a server. It receives and sends messages, and can also start/stop other agents. By “transactional state”, we mean that state is associated to a “conversation”; not to the actor itself, as opposed to objects having a global shared state. As a consequence actors are free of concurrency issues.

My Test Application

My test application downloads pages from the world wide web, indexes them to build up a term vector for the document (described elsewhere on my blog), and then writes the vector into a database table. This is what my application would look like if it was a single-threaded application. Each of these components has a well-defined interface. The Download job takes a URL and produces a Java object with the title and body parsed out into member variables, the Index job takes this Java object and produces a Map of terms and counts, and the Write job takes this map and writes it to the database.

My first cut at parallelizing this to run on a multi-CPU box would be to simply split this up horizontally into a bunch of threads, each of which runs the entire pipe. The first two processes should be read-only with respect to shared data, and the last one inserts into the database within transactions. So assuming that I've taken standard precautions about making the read-only shared data explicitly immutable, it should be safe to parallelize.

My next step is to optimize this a bit further. Given that my Index task is likely to be almost completely CPU-bound and my Write task almost completely IO-bound, and the Download task perhaps a combination of the two, and that I have 8 CPUs on my machine, it may make sense to allocate 3 of them to Download, 4 of them to Index and 1 of them to Write. We could do this by splitting up the process into three actors - Download, Index and Write - which would pools of 3, 4 and 1 threads respectively.

The code

Here is the code using Kilim. An ActorManager is responsible for setting up and starting the Actors, and feeding them with requests. Once done, it sends a poison pill request STOP, which causes the Actors to pass the message on to the next Actor and terminate themselves. The last Actor will terminate and send back an event to the ActorManager on the callback Mailbox, which will cause the ActorManager itself to terminate.

The ActorManager instantiates and passes in Mailboxes to the Actors. Each actor gets a reference to its inbox and a reference to its neighbor's inbox (its outbox). The Write Actor does not need to forward the results of its computation, so it has a null Mailbox for its outbox.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/ActorManager.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.ExitMsg;
import kilim.Mailbox;

public class ActorManager {

  public static final String STOP = "__STOP__";
  private static final int ACTOR_THREAD_POOL_SIZE = 2;
  
  public static void main(String[] args) {
    Mailbox<Message> mb0 = new Mailbox<Message>();
    Mailbox<Message> mb1 = new Mailbox<Message>();
    Mailbox<Message> mb2 = new Mailbox<Message>();
    Mailbox<ExitMsg> callback = new Mailbox<ExitMsg>();
    
    // instantiate actors
    DownloadActor downloadActor = new DownloadActor(
      ACTOR_THREAD_POOL_SIZE, mb0, mb1);
    IndexActor indexActor = new IndexActor(
      ACTOR_THREAD_POOL_SIZE, mb1, mb2);
    WriteActor writeActor = new WriteActor(
      ACTOR_THREAD_POOL_SIZE, mb2, null);
    
    // start the actors
    downloadActor.start();
    indexActor.start();
    writeActor.start();
    writeActor.informOnExit(callback);
    
    for (int i = 0; i < 10000; i++) {
      String req = "Requested " + i;
      mb0.putnb(new Message(i, req));
      System.out.println(req);
    }
    
    // poison pill to stop the actors
    mb0.putnb(new Message(Integer.MAX_VALUE, ActorManager.STOP));
    // block till the last actor has informed the manager that it exited
    callback.getb();
    System.exit(0);
  }
}

The base class for an Actor in Kilim is the Task, it provides a @pausable execute() method which needs to be overriden and implemented by subclasses. Read the Kilim docs for more details. The boilerplate code for the Task.execute() method has been abstracted out into the abstract Actor class, which requires subclasses to implement the act(Message)::void method. Here is the code for my Actor abstract class. The execute() method is an infinite read-eval-write loop.

Notice that the Actor class I have is tailored to my application, in the sense that it can take only two Mailbox objects during construction, and the poison pill handling is not generic. The Kilim docs mention that the "type-system" is not available in this release (0.5.1) because it is being rewritten, so I am guessing that it will also contain a more generic Actor class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/Actor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;
import kilim.Scheduler;
import kilim.Task;
import kilim.pausable;

public abstract class Actor extends Task {

  private Mailbox<Message> inbox;
  private Mailbox<Message> outbox;
  
  public Actor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    this.inbox = inbox;
    this.outbox = outbox;
    setScheduler(new Scheduler(numThreads));
  }

  public @pausable void execute() {
    for (;;) {
      Message request = inbox.get();
      // this is custom poison pill handling code for our application
      if (request.payload instanceof String &&
          ((String) request.payload).equals(ActorManager.STOP)) {
        if (outbox != null) {
          outbox.put(request);
        }
        break;
      }
      // end of poison pill handling
      act(request);
      if (outbox != null) {
        outbox.put(request);
      }
    }
  }
  
  public abstract void act(Message request);
}

The Message class is just a holder for the various Actors to update as it passes through them. We have an id here to identify it for logging purposes and such, but the important part is the payload. Each actor reads the payload, and writes out another payload object for the next actor to consume.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/Message.java
package com.mycompany.myapp.concurrent.kilim;

public class Message {

  public int id;
  public Object payload;
  
  public Message(int id, Object payload) {
    this.id = id;
    this.payload = payload;
  }
}

And here are the various Actor implementations. Currently they are stubbed out to just change the message and pass it on, and print out the message. There is not much to describe here, so I just show them all together.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/DownloadActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class DownloadActor extends Actor {

  public DownloadActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst(
        "Requested ", "Downloaded ");
      request.payload = responsePayload;
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/IndexActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class IndexActor extends Actor {

  public IndexActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst(
        "Downloaded ", "Indexed ");
      System.out.println(responsePayload);
      request.payload = responsePayload;
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/WriteActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class WriteActor extends Actor {

  public WriteActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst("Indexed ", "Wrote ");
      System.out.println(responsePayload);
      request.payload = responsePayload;
    }
  }
}

Compiling and Weaving

Kilim enhances the bytecode created by the compilation process, in a process called Weaving. It uses ASM to do the enhancement. Since I use Maven, I could either run the Weaver from the command line using java, write a Maven plugin, or generate an Ant build.xml from Maven, update it and use that going forward. I chose the last approach. In case there are any other Maven users out there, I did mvn ant:ant to build the build.xml, then I added this snippet to the "compile" target. Its almost a cut-n-paste from the target in the Kilim distribution's build.xml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  <target name="compile" depends="get-deps" description="Compile the code">
    ...
    <java classname="kilim.tools.Weaver" fork="yes">
      <classpath refid="weave.classpath"/>
      <assertions>
        <enable/>
      </assertions>
      <arg value="-x"/><!-- skip classes that match ExInvalid -->
      <arg value="ExInvalid|test"/>
      <arg value="-d"/>
      <arg value="${maven.build.output}"/>
      <arg line="${maven.build.output}"/>
    </java>
  </target>

I also added this "run" target to use ant to call the ActorManager. I call it using ant run -Drun=full.classname.

1
2
3
4
5
6
7
8
  <target name="run" depends="compile" description="Run named class">
    <java classname="${run}" fork="yes">
      <classpath>
        <path refid="build.classpath"/>
        <pathelement location="${maven.build.output}"/>
      </classpath>
    </java>
  </target>

The code processes 10,000 URLs, so the run actually creates 30,000 tasks (10,000 per Actor) using the 3 thread pools of 2 threads each per actor. The entire thing completes in about 12 seconds on my laptop (with a AMD Turion dual-core CPU).

Random Observations

Similarity with HTTP request-response

The Actor framework could be seen as an inter-thread version of web applications. A web request may ultimately be served by some sort of servlet, but it can go through multiple proxies to get there. At each state, the proxies would probably add or process some attributes of the request. So essentially, the request (message) is the only thing that changes as it goes through different proxies (actors).

Similarity with JMS

Message passing concurrency looks very similar to a JMS application. In JMS terms, Kilim can be thought of as using a point-to-point architecture, whereas Jetlang uses a publish-subscribe architecture. I will probably try building something similar with Jetlang and talk about it in an upcoming blog.

Differences from Erlang Actors

From the Erlang examples I've seen, an Erlang actor comes preloaded with its own incoming Mailbox. An Erlang actor exposes two operators, a receive and send. The receive implicitly reads its inbox and matches the message against a set of patterns. If a pattern matches, then the actor to send to is specified during the send operation. In Java (pseudo-code), an Erlang style actor would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  public class Actor {
    private Mailbox<Message> inbox;

    public Actor() {
      inbox = new Mailbox<Message>
    }

    public Message receive() {
      for (;;) {
        Message message = inbox.get();
        act(message);
      }
    }

    public void send(Message message) {
      inbox.put(message);
    }

    public void act(Message message);
  }

A subclass act(Message)::void implementation would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  public class MyActor extends Actor {
    public void act(Message message) {
      if (Message instanceof Foo) {
        message = doPrefooOperation(message);
        actors.get(Foo).send(message);
      } else if (Message instanceof Bar) {
        message = doPrebarOperation(message);
        actors.get(Bar.class).send(message);
      } else {
        actors.get(Baz.class).send(message);
      }
    }
  }

This requires splitting up the @pausable execute method into three methods to make the API look pretty (and it doesn't work with Kilim - I tried it, and based on the Weaver's complaints, I ended up making almost everything @pausable, which caused other issues, so I gave up). It also introduces an additional shared resource, the actors registry to do the lookup of the Actor class. However, this functionality can be done with Kilim slightly differently. Since we know that our MyActor class can talk to three different Actors, we configure it with the references to their incoming mailboxes, something like this. In either case, we are telling the code where to send the processed message, the only difference is that in Erlang you tell it in its receive method and in Kilim you tell it during contruction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public class MyActor extends Task {
    public Mailbox<Message> inbox;
    public Mailbox<Message> fooInbox;
    public Mailbox<Message> barInbox;
    public Mailbox<Message> bazInbox;

    public MyActor(Mailbox<Message> inbox,
                   Mailbox<Message> fooInbox,
                   Mailbox<Message> barInbox,
                   Mailbox<Message> bazInbox) {
      // set these
    }

    public @pausable void execute() {
      for (;;) {
        Message message = inbox.get();
        if (message instanceof Foo) {
          message = doPrefooOperation(message);
          fooInbox.put(message);
        } else if (message instanceof Bar) {
          message = doPrebarOperation(message);
          barInbox.put(message);
        } else {
          bazInbox.put(message);
        }
      }
    }
  }

References

  • Java Concurrency in Practice. If you are going to do much concurrent Java programming, you may want to read this book. It has lots of tips on writing safe concurrent programs, as well as in-depth descriptions of the concurrency classes in Java 5.
  • Concurrency tips - from the Java Specialist site, written by Dr Heinz M Kabutz. A set of 10 commandments for programmers who write (or will write) multithreaded Java programs.
  • The Problem with Threads (PDF) by Edward E Lee. This is also available from the Kilim site. The article argues for identifying shared structures in concurrent code, then isolating them from the rest of the code via synchronized data structures.
  • How to Write Parallel Programs (PDF) by Nicholas Carriero and David Gelernter. It describes various ways to partition your code into concurrent/parallel modules which can be run simultaneously.

In addition, there is a list of interesting concurency articles maintained by Alex Miller that you may also want to take a look at.

6 comments (moderated to prevent spam):

Heinz Kabutz said...

Nice article. Thanks for the reference to my newsletter.

zehrer said...

Have you done tests for single thread performance or tried other multithread solutions?

Sujit Pal said...

Thank you, Dr Kabutz. You are welcome about the reference, and thank /you/ for the excellent set of tips on concurrency. Some time back I had to introduce a Thread.sleep(1000L) in my code, and I immediately thought of your broken doorbell tip :-).

Sujit Pal said...

@zehrer: I haven't run any performance numbers, since I was concentrating on the code usage part more. But in today's post (the next one from this one), I tried coding up the same trivial app with Jetlang and ran some tests with different number of tasks.

Anonymous said...

For some reason actor model does not seem like a natural extension to Java. I am no James Gosling to say whether Java is meant to do this or not. Perhaps should we shift the blame on to the frameworks which have made actor model like an after-thought on Java? These weaving strategy implementations like Kilim, ActorFoundry, Actors Guild etc… have to take some blame for making actor models look very unnatural to Java development – putting a kink in every development step and development tool. The leap of faith with actor models may perhaps never happen until it is made a part of Java platform itself (like how actors implementation is provided as part of the Scala standard library). For now, for die-hards Java nerds, I would suggest that among Java alternatives, Jetlang is the best because it provides a consistent model that does not feel misplaced or unnatural in the context of Java.

Sujit Pal said...

I like Jetlang too, and for the same reasons as you do. However, code weaving seems to be a common approach nowadays, and I guess once you get past that, Kilim's API is actually simpler. Haven't used either for high volume work though...