Friday, January 02, 2009

More Java Actor Frameworks Compared

Over the past few weeks, I've been looking at various (Java and Scala based) Actor frameworks. In an attempt to understand the API of these frameworks, I've been porting the same toy example consisting of three pipelined Actors responding to a bunch of requests shoved down one end of the pipe. To get a feel for how they compare, performance-wise, to one another, I've also been computing wallclock times for various request batch sizes. Mike Rettig (author of the Jetlang project) pointed out that the Jetlang numbers I published in my last week's post appeared incorrect in comparison to the Scala numbers. Rajesh Karmani (one of the authors of the ActorFoundry project) also expressed surprise that Kilim numbers were higher compared to Scala.

Mike was kind enough to take a look at the Jetlang code, and he suggested that the excessive amounts of console IO that the actors were making were causing it to perform worse than Scala. Taking the println() calls out from both the Scala and the Jetlang examples improved the performance of both significantly, and the Jetlang example ended up with lower elapsed time numbers than the Scala examples. Apparently, the performance characteristics of the Scala println() was different enough from the Java System.out.println() to skew the results. This week, I remove the console output from all the examples (after verifying that they work correctly) and republish the numbers.

Tim Jansen (author of the Actor's Guild framework), was also kind enough to build me an Actor's Guild version of my example.

Rajesh also took a look at the code for the Kilim example at my request, and he pointed out several improvements that may make Kilim run faster. I have incorporated his suggestions into the rewritten code. He also pointed me to his ActorFoundry project, and, over the last couple of days, he has been of immense assistance (via email) in helping me to build an ActorFoundry version of my toy example.

This week, I provide the updated code for Kilim and Jetlang, and code to work with Actor's Guild and ActorFoundry, and provide the elapsed time comparison between these frameworks (as well as the Scala examples from last week). In many ways, this post is largely due to the efforts of these three fine programmers. Thank you, gentlemen!

Kilim - updated

The original Kilim code for my example used a Message object that was passed around between the Actors. Since the Jetlang example just used a String messsage (which was really all that my example needed), I changed it over to use a String instead of the Message object, thereby removing the extra instanceof checks to distinguish between a regular message and a poison pill termination messsage. The code is explained in detail in my previous post, I just just post the updated code here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/ActorManager.java
package com.mycompany.myapp.concurrent.kilim;

import java.util.concurrent.TimeUnit;

import kilim.ExitMsg;
import kilim.Mailbox;

public class ActorManager {

  public static final String STOP = "__STOP__";
  private static final int ACTOR_THREAD_POOL_SIZE = 2;
  
  public static void main(String[] args) {
    Mailbox<String> mb0 = new Mailbox<String>();
    Mailbox<String> mb1 = new Mailbox<String>();
    Mailbox<String> mb2 = new Mailbox<String>();
    Mailbox<ExitMsg> callback = new Mailbox<ExitMsg>();
    
    // instantiate actors
    DownloadActor downloadActor = new DownloadActor(ACTOR_THREAD_POOL_SIZE, mb0, mb1);
    IndexActor indexActor = new IndexActor(ACTOR_THREAD_POOL_SIZE, mb1, mb2);
    WriteActor writeActor = new WriteActor(ACTOR_THREAD_POOL_SIZE, mb2, null);
    
    // start the actors
    downloadActor.start();
    indexActor.start();
    writeActor.start();
    writeActor.informOnExit(callback);
    
    long start = System.nanoTime();
    int numTasks = 1000000;
    for (int i = 0; i < numTasks; i++) {
      String req = "Requested " + i;
      mb0.putnb(req);
      log(req);
    }
    
    // poison pill to stop the actors
    mb0.putnb(ActorManager.STOP);
    // block till the last actor has informed the manager that it exited
    callback.getb();
    long elapsed = System.nanoTime() - start;
    System.out.println("elapsed=" + TimeUnit.MILLISECONDS.convert(elapsed, TimeUnit.NANOSECONDS));
    System.exit(0);
  }
  
  public static void log(String message) {
//    System.out.println(message);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/Actor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;
import kilim.Task;
import kilim.pausable;

public abstract class Actor extends Task {

  private Mailbox<String> inbox;
  private Mailbox<String> outbox;
  
  public Actor(int numThreads, Mailbox<String> inbox, Mailbox<String> outbox) {
    this.inbox = inbox;
    this.outbox = outbox;
//    setScheduler(new Scheduler(numThreads));
  }

  @pausable
  public void execute() {
    for (;;) {
      String request = inbox.get();
      // this is custom poison pill handling code for our application
      if (request.equals(ActorManager.STOP)) {
        if (outbox != null) {
          outbox.put(request);
        }
        break;
      }
      // end of poison pill handling
      String response = act(request);
      ActorManager.log(response);
      if (outbox != null) {
        outbox.put(response);
      }
    }
  }
  
  public abstract String act(String request);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/DownloadActor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;

public class DownloadActor extends Actor {

  public DownloadActor(int numThreads, Mailbox<String> inbox, Mailbox<String> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public String act(String request) {
    return request.replaceFirst("Requested ", "Downloaded ");
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/IndexActor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;

public class IndexActor extends Actor {

  public IndexActor(int numThreads, Mailbox<String> inbox, Mailbox<String> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public String act(String request) {
    return request.replaceFirst("Downloaded ", "Indexed ");
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/WriteActor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;

public class WriteActor extends Actor {

  public WriteActor(int numThreads, Mailbox<String> inbox, Mailbox<String> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public String act(String request) {
    return request.replaceFirst("Indexed ", "Wrote ");
  }
}

Jetlang - updated

The original code for my Jetlang example can be found here.

Mike rewrote my example quite a bit and made it part of the Jetlang distribution examples. You can browse the code in the Jetlang SVN repository. The main change is the refactoring out of the System.out.println() calls into the Main.log() method (the Main.java is the same as my ActorManager.java) and commenting it out for benchmarking. Other changes include changing the Message object into a String, and the addition of channels and listener to respond to the poison pill. Overall, the resulting code is more elegant than mine, so I've changed my code to reflect these changes.

Scala (loop/receive) - updated

The Scala versions (originally described here) remain almost unchanged, except that there is now a new function log in the ActorManager object, and all the Actors use this method to log the message. As in the Jetlang example, it's body is commented out. I have also changed the while(true) call in the previous example to use the loop method of Actor. Here is the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Source: ActorManager.scala
package myjob

import java.lang._
import java.util.concurrent.CountDownLatch
import scala.actors._
import scala.actors.Actor._

object ActorManager {

  val latch = new CountDownLatch(3)
  def decrementLatch(): Unit = {
    latch.countDown
  }

  def log(message:String): Unit = {
    //println(message)
  }

  def main(args: Array[String]): Unit = {
    // start the actors
    DownloadActor.start
    IndexActor.start
    WriteActor.start
    // seed the download actor with requests
    val start = System.currentTimeMillis
    for (i <- 1 until 1000000) {
      val payload = "Requested " + i
      log(payload)
      DownloadActor ! payload
    }
    // ask them to stop
    DownloadActor ! StopMessage
    // wait for actors to stop
    latch.await
    println("elapsed = " + (System.currentTimeMillis - start))
  }
}

case class StopMessage()

object DownloadActor extends Actor {
  def act() {
    loop {
      receive {
        case payload: String => {
          val newPayload = payload.replaceFirst("Requested ", "Downloaded ")
          ActorManager.log(newPayload)
          IndexActor ! newPayload
        }
        case StopMessage => {
          ActorManager.log("Stopping download")
          IndexActor ! StopMessage
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

object IndexActor extends Actor {
  def act() {
    loop {
      receive {
        case payload: String => {
          val newPayload = payload.replaceFirst("Downloaded ", "Indexed ")
          ActorManager.log(newPayload)
          WriteActor ! newPayload
        }
        case StopMessage => {
          ActorManager.log("Stopping Index")
          WriteActor ! StopMessage
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

object WriteActor extends Actor {
  def act() {
    loop {
      receive {
        case payload: String => {
          val newPayload = payload.replaceFirst("Indexed ", "Wrote ")
          ActorManager.log(newPayload)
        }
        case StopMessage => {
          ActorManager.log("Stopping Write")
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

Scala (loop/react) - updated

The loop/react version of the above Scala example simply replaces the loop/receive calls with loop/react. In the interests of brevity, I am not including it here - just change the receive call to react in three places and you have the loop/react version.

Actor's Guild

The Actor's Guild framework provides a nice annotation based approach to build Actors. Methods that are marked as @Initializer roughly correspond to actor constructors, and methods annotated by @Message correspond roughly to the Actor.act() method. Both return an AsyncResult. @Message methods may take parameters. Actor's Guild provides an Actor class which all application Actors must extend. More information is available in the tutorial. The code (provided by Tim Jansen with some extra comments from me) is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Source: src/main/java/com/mycompany/myapp/concurrent/actorsguild/ActorManager.java
package com.mycompany.myapp.concurrent.actorsguild;

import java.util.concurrent.TimeUnit;

import org.actorsguildframework.AsyncResult;
import org.actorsguildframework.DefaultAgent;

public class ActorManager {
  public static void main(String[] args) {
    DefaultAgent ag = new DefaultAgent();
    
    WriteActor writeActor = ag.create(WriteActor.class);
    IndexActor indexActor = ag.create(IndexActor.class).init(writeActor).get();
    DownloadActor downloadActor = 
      ag.create(DownloadActor.class).init(indexActor).get();

    // The original code allocated an array of AsyncResult[numberOfRequests]
    // and populated it by looping through the number of tasks and seeding the
    // downloadActor with its initial request. Although conceptually simpler, 
    // it needed a huge amount of memory and didn't scale well for 
    // numberOfRequests > 100,000. So the strategy is to batch the tasks
    // into blocks of 100,000 and submit until they are all processed.
    long start = System.nanoTime();
    int numberOfRequests = 1000000;
    int tasksDone = 0;
    while (tasksDone < numberOfRequests) {
      int batchSize = Math.min(numberOfRequests - tasksDone, 100000);
      AsyncResult[] results = new AsyncResult[batchSize];
      for (int i = 0; i < batchSize; i++) {
        results[i] = downloadActor.download(tasksDone + i, "Requested " + i);
      }
      ag.awaitAllUntilError(results);
      tasksDone += batchSize;
    }
    long elapsed = System.nanoTime() - start;
    System.out.println("elapsed=" + TimeUnit.MILLISECONDS.convert(
      elapsed, TimeUnit.NANOSECONDS));
    ag.shutdown();
  }
  
  public static void log(String message) {
//    System.out.println(message);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/com/mycompany/myapp/concurrent/actorsguild/DownloadActor.java
package com.mycompany.myapp.concurrent.actorsguild;

import org.actorsguildframework.Actor;
import org.actorsguildframework.AsyncResult;
import org.actorsguildframework.annotations.Initializer;
import org.actorsguildframework.annotations.Message;


public class DownloadActor extends Actor {
  public IndexActor indexActor;
  
  @Initializer
  public AsyncResult<DownloadActor> init(IndexActor indexActor) {
    this.indexActor = indexActor;
    return result(this);
  }
  
  @Message
  public AsyncResult<Void> download(int id, String payload) {
    String newPayload = payload.replaceFirst("Requested ", "Downloaded ");
    ActorManager.log(newPayload);
    return indexActor.index(id, newPayload);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/com/mycompany/myapp/concurrent/actorsguild/IndexActor.java
package com.mycompany.myapp.concurrent.actorsguild;

import org.actorsguildframework.Actor;
import org.actorsguildframework.AsyncResult;
import org.actorsguildframework.annotations.Initializer;
import org.actorsguildframework.annotations.Message;

public class IndexActor extends Actor {
  public WriteActor writeActor;
  
  @Initializer
  public AsyncResult<IndexActor> init(WriteActor writeActor) {
    this.writeActor = writeActor;
    return result(this);
  }
  
  
  @Message
  public AsyncResult<Void> index(int id, String payload) {
    String newPayload = payload.replaceFirst("Downloaded ", "Indexed ");
    ActorManager.log(newPayload);
    return writeActor.write(id, newPayload);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Source: src/main/java/com/mycompany/myapp/concurrent/actorsguild/WriteActor.java
package com.mycompany.myapp.concurrent.actorsguild;

import org.actorsguildframework.Actor;
import org.actorsguildframework.AsyncResult;
import org.actorsguildframework.annotations.Message;

public class WriteActor extends Actor {
  @Message
  public AsyncResult<Void> write(int id, String payload) {
    String newPayload = payload.replaceFirst("Indexed ", "Wrote ");
    ActorManager.log(newPayload);
    return noResult();
  }
}

Unlike Kilim, which uses bytecode enhancement as a post-compilation step, Actor's Guild uses bytecode enhancement at runtime to create several helper classes dynamically for each Actor. This is done once, the first time the Actor is created. Actor's Guild uses asm 3.1 (as opposed to asm-2.2.3 for Kilim) to do the bytecode enhancement.

The resulting code is quite easy to read. The initial version was even easier, but because we are pre-allocating the array of AsyncResult objects to hold the results, when there are a large number of requests to be processed, my machine was thrashing with a 2GB heap and times for 1 million tasks were quite high. So Tim made the change to batch them up in chunks, which yields much better times. Benchmarks aside, the idiom for breaking up a large concurrent job into batches of smaller size is quite neat, and could possibly find uses in similar situations elsewhere.

ActorFoundry

ActorFoundry uses Kilim internally. It provides a runner application (called Foundry) that the application Actors run within. Like Actor's Guild, it relies on annotations, and messages correspond to methods in the Actors. Unlike Actor's Guild, messages are sent using a send() call, the parameters of which identify the target actor and method name - it looks a bit like method invocation using Reflection. The methods which can be called as messages are marked with the @message annotation. In ActorFoundry, all components run within the foundry and must be Actors, so the ActorManager in my example is also an Actor. Here is the code. I wrote an initial version of the code based on the examples in the distribution, which didn't work, and which Rajesh modified so it would.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Source: src/main/java/com/mycompany/myapp/concurrent/actorfoundry/ActorManager.java
package com.mycompany.myapp.concurrent.actorfoundry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import osl.manager.Actor;
import osl.manager.ActorName;
import osl.manager.RemoteCodeException;
import osl.manager.annotations.message;

public class ActorManager extends Actor {
  
  private static final long serialVersionUID = -8621318190754146319L;

  private static final CountDownLatch latch = new CountDownLatch(3);
  
  @message
  public void boot(Integer tasks) {
    try {
      ActorName downloadActor = create(DownloadActor.class, self());
      // seed the download actor with numRequests tasks
      long start = System.nanoTime();
      for (int i = 0; i < tasks; i++) {
        String message = "Requested " + i;
        send(downloadActor, "download", message);
//        send(stdout, "println", message);
      }
      // send poison pill to terminate actors
      send(downloadActor, "stop");
      // wait for all the actors to terminate after getting the poison pill
      latch.await();
      long elapsed = System.nanoTime() - start;
      send(stdout, "println", "elapsed=" + 
        TimeUnit.MILLISECONDS.convert(elapsed, TimeUnit.NANOSECONDS));
      System.exit(0);
    } catch (RemoteCodeException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  
  @message
  public static void decrementLatch() {
    latch.countDown();
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Source: src/main/java/com/mycompany/myapp/concurrent/actorfoundry/DownloadActor.java
package com.mycompany.myapp.concurrent.actorfoundry;

import osl.manager.Actor;
import osl.manager.ActorName;
import osl.manager.RemoteCodeException;
import osl.manager.annotations.message;

public class DownloadActor extends Actor {

  private static final long serialVersionUID = -2311959419132224127L;

  private ActorName actorManager;
  private ActorName indexActor;
  
  public DownloadActor(ActorName manager) throws RemoteCodeException {
    actorManager = manager;
  }
  
  @message
  public void download(String message) throws RemoteCodeException {
    String newMessage = message.replaceFirst("Requested ", "Downloaded ");
    if (indexActor == null) {
      indexActor = create(IndexActor.class, actorManager);
    }
//    send(stdout, "println", newMessage);
    send(indexActor, "index", newMessage);
  }
  
  @message
  public void stop() throws RemoteCodeException {
    send(indexActor, "stop");
    ActorManager.decrementLatch();
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Source: src/main/java/com/mycompany/myapp/concurrent/actorfoundry/IndexActor.java
package com.mycompany.myapp.concurrent.actorfoundry;

import osl.manager.Actor;
import osl.manager.ActorName;
import osl.manager.RemoteCodeException;
import osl.manager.annotations.message;

public class IndexActor extends Actor {

  private static final long serialVersionUID = -7939186176349943105L;

  private ActorName actorManager;
  private ActorName writeActor;
  
  public IndexActor(ActorName manager) throws RemoteCodeException {
    actorManager = manager;
  }

  @message
  public void index(String message) throws RemoteCodeException {
    String newMessage = message.replaceFirst("Downloaded ", "Indexed ");
    if (writeActor == null) {
      writeActor = create(WriteActor.class, actorManager);
    }
//    send(stdout,"println",newMessage);
    send(writeActor, "write", newMessage);
  }
  
  @message 
  public void stop() throws RemoteCodeException {
    send(writeActor, "stop");
    ActorManager.decrementLatch();
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Source: src/main/java/com/mycompany/myapp/concurrent/actorfoundry/WriteActor.java
package com.mycompany.myapp.concurrent.actorfoundry;

import osl.manager.Actor;
import osl.manager.ActorName;
import osl.manager.RemoteCodeException;
import osl.manager.annotations.message;

public class WriteActor extends Actor {

  private static final long serialVersionUID = -4203081425372996186L;

  private ActorName actorManager;

  public WriteActor(ActorName manager) {
    actorManager = manager;
  }

  @message
  public void write(String message) throws RemoteCodeException {
    String newMessage = message.replaceFirst("Indexed ", "Wrote ");
//    send(stdout, "println", newMessage);
  }
  
  @message 
  public void stop() throws RemoteCodeException {
    ActorManager.decrementLatch();
  }
}

There is actually no necessity to comment out the console IO calls in this case, since stdout is an actor and processes the println() calls asynchronously, but I did this in any case, for consistency with the other examples.

Processing the code so it can be run is quite complex, and involves code generation before compilation and post-processing (using the Kilim weaver) after compilation. Here is the snippet from my build.xml that selectively works on the ActorFoundry code in the compile target (its mostly copied from the various targets from the build.xml file in the ActorFoundry distribution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  <target name="compile" depends="get-deps" description="Compile the code">
    <mkdir dir="${maven.build.output}"/>
    <javac srcdir="${maven.src.dir}"
           destdir="${maven.build.output}" 
           excludes="**/package.html" 
           debug="true" 
           deprecation="true" 
           optimize="false">
      <classpath refid="build.classpath"/>
    </javac>
    <!-- check local constraints happens for af only -->
    <apt srcdir="${maven.src.dir}/com/mycompany/myapp/concurrent/actorfoundry"
         compile="false"
         classpathref="build.classpath"
         debug="true"
         factory="osl.foundry.preprocessor.LocalSynchConstAPF"
         factorypathref="build.classpath"/>
    <!-- code generation for af only -->
    <delete dir="${maven.src-gen.dir}"/>
    <mkdir dir="${maven.src-gen.dir}"/>
    <javadoc private="true"
         doclet="osl.foundry.preprocessor.ExecutorCodeGen"
         docletpathref="build.classpath"
         classpathref="build.classpath"
         sourcepath="${maven.src.dir}"
         packagenames="com.mycompany.myapp.concurrent.actorfoundry">
      <arg line="-outdir ${maven.src-gen.dir}"/>
    </javadoc>
    <!-- compile generated code: for af only -->
    <javac srcdir="${maven.src-gen.dir}"
           destdir="${maven.build.output}"
           debug="on"
           fork="on">
      <classpath refid="build.classpath"/>
    </javac>
    <!-- weaving happens for kilim and af files -->
    <java classname="kilim.tools.Weaver" fork="yes">
      <classpath refid="weave.classpath"/>
      <assertions>
        <enable/>
      </assertions>
      <arg value="-x"/>
      <arg value="ExInvalid|test"/>
      <arg value="-d"/>
      <arg value="${maven.build.output}"/>
      <arg line="${kilim.prefix}.ActorManager 
                    ${kilim.prefix}.Actor 
                    ${kilim.prefix}.DownloadActor 
                    ${kilim.prefix}.IndexActor 
                    ${kilim.prefix}.WriteActor 
                    ${af.prefix}.ActorManagerExecutor 
                    ${af.prefix}.DownloadActorExecutor 
                    ${af.prefix}.IndexActorExecutor 
                    ${af.prefix}.WriteActorExecutor"/>
    </java>
  </target>

If you are observant, you will notice that there is no mention in my code of the *Executor class names I provide for weaving. These are actually code generated off the Actors shown by the ExecutorCodeGen code generator. To run the code, I use the following shell script. The class at the near end of the actor pipeline (ActorManager) is mentioned, and the parameter to its boot() method is provided. The -open means that the foundry remains running even after all the actors are finished. In the code, I have a System.exit(0) which terminates the run.

1
2
prompt$ java -cp $REPO/actorfoundry-1.0.jar:target/classes osl.foundry.FoundryStart \
    com.mycompany.myapp.concurrent.actorfoundry.ActorManager boot 1000000 -open

One thing to note about ActorFoundry is its license, it is not free for commercial use. There are plans for making the source repository visible to outsiders, and the distribution does come with lots of example code, but it would be nice if the project had a tutorial style user guide for new users to get started.

Elapsed Time Comparison

I ran all the examples, increasing the task size from 1 to 1,000,000 in power-steps of 10 (ie, 1, 10, 100, ..., 1,000,000). The results are shown below in graph and data form.

#-TASKSKILIMJETLANGSCALA
RECV
SCALA
REACT
ACTORS
GUILD
ACTOR
FOUNDRY
122545814170
10527447613142
10045538010678204
1000330310424509333557
100008609031921162611411497
100000242922865601424247403542
1000000197161865052601348373883424844

Please remember that these numbers are meaningless if you are trying to figure out which will perform the best for your application. All that the actors in my application do is replace a string with another. Real-world actors that you will write for your application are likely to do something less trivial that that, which could potentially cause threads to block, and perhaps result in very different performance characteristics. To figure out which actor application would be best suited to your application, you should run your own benchmarks - now that you've read this far, you are as familiar as I am with the various actor APIs, so writing your own application should be fairly simple.

Update - 2009-01-06

Phillip Haller (one of the people behind Scala's Actor framework) pointed out that there are some optimizations to Scala's internal thread pool implementation in version 2.7.3, so I reran the Scala examples with this version. The chart and table below summarize the results.

#-TASKSKILIMJETLANGSCALA
RECV
SCALA
REACT
ACTORS
GUILD
ACTOR
FOUNDRY
122524904170
10527554713142
1004553487578204
1000330310327382333557
100008609031351157011411497
100000242922865096327447403542
1000000197161865032975234693883424844

Update - 2009-01-12

Mats Henrickson pointed out that the Scala example was doing string concatenation and the Java examples were not, and also pointed out that a Message class was being used to pass the message as opposed to a plain string in the Java examples. He was kind enough to send me an updated version of the code. He noticed an 8% speedup in the Scala code as a result of these changes on his box (see comments below). I have made the updates to the Scala example above.

26 comments:

  1. Nice work.

    It would be interesting to see a Fork/Join implementation. Since the example is stateless, it should be rather trivial. Fork/Join doesn't follow the actor model, but for many stateless problems, such as this one, it is a better solution.

    For the small number of tasks, performance is affected by jvm class loading and thread initialization. A more accurate test would need to warm up the code before testing latency for a small number of tasks.

    Mike Rettig
    Jetlang Developer

    ReplyDelete
  2. Very thorough comparison, thanks for doing it.

    Stephan
    --
    Programming is hard - http://blog.codemonkeyism.com
    http://twitter.com/codemonkeyism

    ReplyDelete
  3. Very nice write up, and useful.

    Maybe someone does a erlang vs haskel vs lua vs D.

    .V

    ReplyDelete
  4. Correct me if I'm wrong but I think one other thing to note about the above examples is that certain frameworks like Kilim (not sure about ActorFoundry) allow for blockable methods to be called within the actors without "blocking" the thread executing the Actor. Instead this framework will perform a transformation and push it on some kind of stack until it is unblocked allowing the thread to move on to process other actors.

    Contrast this to ActorGuild (I really adore the syntax) and JetLang where the onus is on the programmer to be careful about any blocking that could consume a thread. These frameworks, while good, are really more frameworks that enable you to implement the Actor style.

    Thanks for the great comparison. This is the second time I've found your blog and gotten some pretty valuable information from it :). I would love to hear anyones comment on the above.

    ReplyDelete
  5. Which version of Scala did you use? I just ran your benchmark using Scala 2.7.3.RC1 and it is about 2.3 times faster compared to Scala 2.7.2. This is due to a recent optimization of the internal thread pool. On my machine, I get the following numbers for 1000000 tasks:

    Scala 2.7.2: 27972 ms
    Scala 2.7.3.RC1: 11942 ms

    In each case I took the median of 5 runs. This was run on a dual-core Intel Core 2 @ 2.66GHz running Linux 2.6.15 and Sun's HotSpot Server VM 1.6.0-b105.

    So, if you used Scala 2.7.2 it would be great if you could re-run your benchmark using Scala 2.7.3.RC1. Thanks!

    ReplyDelete
  6. @mrettig: Thanks Mike, I will try out doing this example with Fork/Join soon. I think in that case, the three "actors" will get coalesced into a single thread and I could split up the input tasks and coInvoke() them recursively.

    Yes, agreed on the classloading impact. However, these numbers were really not intended to be used as a benchmark, they were originally just intended to give (me) a feel for how each of these frameworks scaled, and once I got the numbers, it seemed interesting enough to share with the world.

    @Stephan.Schmidt: Thanks, and as above, the comparison should not be taken at its face value. If you are looking to choose one over the other, you should run your own benchmarks with your own actors.

    @netsql: Thanks, and it would probably be someone else :-). My 2008 quota for the "one new language a year" has been exhausted with Scala. Only partly kidding on that one...I don't actually have a quota, but I find that learning a new language is a significant time investment and less fun than building things - so I need to really like a language enough to keep at it. My last new language was Python almost 3 years ago, I tried Ruby about a year or so ago but gave up, and now trying out Scala.

    @Jarrod: I think you are right, at least going by the Kilim docs, and the reason I didn't mention it is because I didn't think of it. However, its an important distinction, glad you brought it up.

    From what I saw of ActorFoundry, the *Executor classes that are generated have Kilim's @pausable methods, so I'm guessing that ActorFoundry falls into the same category as Kilim. But I'll let Rajesh comment since he is likely to know more about this than me.

    @Rajesh: You were right about the for/until loop in the Scala examples, it was doing 1 less task than the others. I've changed it to a for/to loop which behaves like the Java for.

    @Phillip Haller: Yes, the scala version I used was 2.7.2-final. I have run the Scala examples again with 2.7.3-RC1, and as you mentioned there are significant improvements in the numbers. I am going to publish an update to the same blog with the updated numbers.

    ReplyDelete
  7. Hi!

    Thanks for one of the most informative blogs in quite a while!

    I noticed your Scala code does the same string creation twice:

    for (i <- 1 to 1000000) {
    log("Requested " + i)
    DownloadActor ! Message(i, "Requested " + i)
    }

    A more performant version would be:

    for (i <- 1 to 1000000) {
    val payload = "Requested " + i
    log(payload)
    DownloadActor ! Message(i, payload)
    }

    It is actually the same in two of the actors:

    case Message(id, payload) => {
    ActorManager.log("Downloaded " + id)
    IndexActor ! Message(id, payload.replaceFirst("Requested ", "Downloaded "))
    }

    A more fair version (compared to the Java examples) would be:

    case Message(id, payload) => {
    val newPayload = payload.replaceFirst("Requested ", "Downloaded ")
    ActorManager.log(newPayload)
    IndexActor ! Message(id, newPayload)
    }

    All in all this adds up to 3 million string concatenations too many, compared to the Java examples. From what I can see, this improves the performance by about 8%.

    The integer i also doesn't add anything to the example, so perhaps removing it and dropping the Message class alltogether, to make it more similar to the Java examples?

    ReplyDelete
  8. I removed the Message class from the Scala code, and sent along a plain String, as in the Java examples, but it didn't improve performance by much. But it simplified the code a bit. If you send your email address to mats.henricson@crisp.se I'll send you my updated code.

    ReplyDelete
  9. Thank you Mats, I will update the code with your changes. I could kick myself for missing the 3M useless String concats btw, thanks for pointing it out :-). I've also changed the send operators to send a plain String like in the code update you sent me. I reran the example on my box, and I am getting /much/ better results than my previous numbers, but it is possible that is because there was nothing else running on the machine at the time...

    ReplyDelete
  10. Just noticed that the following line is missing from the Scala version (compared to the last one you posted):

    case class Message(id:Int, payload:String)

    It would be great if you could upload all the Actor examples to github or bitbucket.

    ReplyDelete
  11. Thanks steshaw, I've added the Message class back into the blog post. Apologies for dropping it, and now I know why my times were so much faster with Mats's changes - it was just ignoring the tasks and only processing the stop message :-).

    As for posting the code to repositories, the only problem is that people expect you to update it and make it better. One of the reasons I switched from writing open source code to blogging as a means of self-learning is that with open source software, I am stuck maintaining the same software while the world merrily passes me by, building cool shiny new things I don't have a chance to look at because I am so busy. Its easy enough to cut-and-paste from the blog post, and if something is spread across multiple posts, I find that people who are interested enough usually end up finding the various posts and cut-paste from there.

    ReplyDelete
  12. @Mats: I applied your updates only partially. While I removed the Message case class and started sending plain Strings as messages, I forgot to change the receive/react case matches. So the Message was never getting matched and the code ran real fast. I would have caught that had I uncommented the println() call in log, but... With the cases now matching on payload, my times are similar to the ones you reported. I've updated the code in the post to reflect the change above that I missed.

    @steshaw: I have removed the Message class again, since its now no longer needed. To bring parity between the Java and the Scala examples, we are now passing a plain String to the actors - thanks to Mats for the change. Unfortunately, I had applied his changes only partially, so it appeared that the Message class was still needed. Sorry for the confusion.

    ReplyDelete
  13. @Small Business Web Site Design: thanks, glad it helped you :-).

    ReplyDelete
  14. Your post is helpful and informative

    ReplyDelete
  15. Thank you, glad you found it informative.

    ReplyDelete
  16. Hello Sujit,

    Hope you are doing well.

    Thanks for your informative post on comparing several actor frameworks. Recently we have come up with a pure Java based distributed parallel programming framework based on actor pattern called Korus.

    As per our benchmarks its the fastest of all the frameworks we have evaluated till now.

    In invite you to review at and share your valuable comments with us.

    Looking forward to hearing back from you.

    Thanks,
    duttaDOTsaurabhATgmailDOTcom

    ReplyDelete
  17. Hi Saurabh, thanks for the info...I am not doing anything with actors anymore myself, but I am hoping that someone else would be able to pick this up.

    ReplyDelete
  18. In Java code (fixed version) in comparison with your first version you have commented "setScheduler(new Scheduler(numThreads));"
    This makes all actors run with default number of threads set by Kilim. Isn't the one of major points of your first version was to show how to optimize different functionality actors to use different CPU load? Like Internet download would require a smaller number of real concurrent threads than indexing etc.

    Benjamin Lvovsky
    Java developer

    ReplyDelete
  19. Hi Ben, its been a while, so I don't really remember, but I suspect it may have to do with running all the different frameworks under the same set of conditions.

    ReplyDelete
  20. Great. I am sure, I will tweet this to my twitter account. This will help a lot of users.

    ReplyDelete
  21. I'm curious how a plain no-framework Java version would compare against it. Translating your examples should be pretty easy and straight formward, like I've done in my blog:

    http://brixomatic.wordpress.com/2010/11/26/java-actor-framework-kilim-a-solution-without-a-problem/

    ReplyDelete
  22. Hi brixomatic, interesting post, thanks for the link. Personally, I find the actor model itself more interesting than the frameworks that are built around it. For example, even though your code does not use Kilim, it uses a Calculator actor (ie a worker that is fed tasks via messages on a queue, and which puts its responses into another queue). I initially checked the frameworks out because I was learning Scala and wondered if Java had functionality to support this model. Among these, I liked Jetlang the best, but then I realized that I didn't really need them - the problem these are trying to solve is running many threads in the same JVM - my problems are better served if they can be parallelized across multiple JVMs and machines, so I built this, which is also based on the actor model.

    ReplyDelete
  23. Hi Sujit,
    I am been surfing a lot but unable to find a way to do multicast/broadcast on internet.
    is there a way to achieve this. basically i am trying to broadcast stock feed to millions of clients same time. pub/sub (even topic based)model doesnt scale out and looking for a smarter solution.
    Appreicate your help.

    ReplyDelete
  24. Nice post. Thanks for sharing

    ReplyDelete
  25. You are welcome, glad it helped.

    ReplyDelete

Comments are moderated to prevent spam.