Wednesday, December 24, 2008

Java Concurrency with Jetlang Actors

Last week, I wrote about my experiments with Kilim, a Java actor framework. This week, I take my same toy example and rewrite it using Jetlang, another Java actor framework. Jetlang is a Java port of the Retlang actor framework for C#/.NET, which is inspired by Erlang actors. From what I can see from reading about Retlang, Jetlang is still missing some functionality, but its still in its early stages (version 0.1.6 at the time of writing this).

Like Kilim, concurrency in Jetlang is made safer using a shared-nothing approach. Unlike Kilim, message communication happens using publish-subscribe. Actor threads are modeled as Fibers and Mailboxes are modeled as Channels. Actors sign up to receive events from Channels using predefined callbacks such as onReceive(), onStop() etc, and they perform an action defined by act() in these callbacks. An actor would publish to a Channel within its act() method.

If you have read my previous post, and the previous paragraph, there is really not that much to the code. Like my Kilim example, a lot of it is adapted from the PingPong example in either distribution. There is an ActorManager which is responsible for setting everything up, which you can see below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/ActorManager.java
package com.mycompany.myapp.concurrent.jetlang;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jetlang.channels.Channel;
import org.jetlang.channels.MemoryChannel;
import org.jetlang.core.Disposable;
import org.jetlang.fibers.Fiber;
import org.jetlang.fibers.PoolFiberFactory;

import com.mycompany.myapp.concurrent.kilim.Message;

public class ActorManager {

  public final static int NUM_ACTORS = 3;
  public final static String STOP = "__STOP__";
  
  public static void main(String[] args) {
    ExecutorService exec = Executors.newCachedThreadPool();
    PoolFiberFactory factory = new PoolFiberFactory(exec);

    // when the poison pill is received, the fiber.dispose() call will
    // call this and decrement the countdown latch. The onstop.await()
    // will block until the latch is zero, so that way the manager waits
    // for all the actors to complete before exiting
    final CountDownLatch onstop = new CountDownLatch(NUM_ACTORS);
    Disposable dispose = new Disposable() {
      public void dispose() {
        onstop.countDown();
      }
    };
    
    Fiber downloadFiber = factory.create();
    downloadFiber.add(dispose);
    DownloadActor downloadActor =  
      new DownloadActor(Channels.downloadChannel, Channels.indexChannel, 
      downloadFiber);
    
    Fiber indexFiber = factory.create();
    indexFiber.add(dispose);
    IndexActor indexActor = 
      new IndexActor(Channels.indexChannel, Channels.writeChannel, 
      indexFiber);
    
    Fiber writeFiber = factory.create();
    writeFiber.add(dispose);
    WriteActor writeActor = 
      new WriteActor(Channels.writeChannel, (Channel<Message>) null, 
      writeFiber);

    downloadActor.start();
    indexActor.start();
    writeActor.start();
    
    // seed the incoming channel with 10,000 requests
    for (int i = 0; i < 10000; i++) {
      String payload = "Requested " + i;
      System.out.println(payload);
      Channels.downloadChannel.publish(new Message(i, payload));
    }
    // send the poison pill to stop processing
    Channels.downloadChannel.publish(new Message(0, ActorManager.STOP));
    
    try { 
      onstop.await(); 
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    exec.shutdown();
  }
}

To stop the ActorManager, we rely on a similar poison pill approach that I used for the Kilim example. Each actor gets the poison pill, places it on its outbox and then terminates its Fiber by calling dispose(). This triggers off an onDispose() event which is decrements the count on the onstop CountDownLatch. The ActorManager waits on the CountDownLatch to terminate.

The Channels class is simply a holder class that holds final static instances of the channels. There is no difference in my case in having it here from instantiating them within the ActorManager.main(), but it looked like a good way to keep them by themselves since they are our "shared" resources here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/Channels.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.channels.MemoryChannel;

import com.mycompany.myapp.concurrent.kilim.Message;

public class Channels {

  public static final Channel<Message> downloadChannel = 
    new MemoryChannel<Message>();
  public static final Channel<Message> indexChannel = 
    new MemoryChannel<Message>();
  public static final Channel<Message> writeChannel = 
    new MemoryChannel<Message>();
}

Next up is an abstract Actor class built out of a Fiber and Channels representing the Actors inbox and outbox. There is an abstract act() method that application specific Actor subclasses will implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/Actor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.core.Callback;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public abstract class Actor {

  private Channel<Message> inChannel;
  private Channel<Message> outChannel;
  private Fiber fiber;
  
  public Actor(Channel<Message> inChannel, 
               Channel<Message> outChannel, 
               Fiber fiber) {
    this.inChannel = inChannel;
    this.outChannel = outChannel;
    this.fiber = fiber;
  }
  
  public void start() {
    // set up subscription listener
    Callback<Message> onRecieve = new Callback<Message>() {
      public void onMessage(Message message) {
        act(message);
        if (outChannel != null) {
          outChannel.publish(message);
        }
        // process poison pill, dispose current actor and pass the message
        // on to the next actor in the chain (above)
        if (message.payload instanceof String &&
            ActorManager.STOP.equals(message.payload)) {
          fiber.dispose();
        }
      }
    };
    // subscribe to incoming channel
    inChannel.subscribe(fiber, onRecieve);
    // start the fiber
    fiber.start();
  }
  
  public abstract void act(Message message);
}

As before, the three actors are pretty trivial and pointless, so I just put them in here without much explanation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/DownloadActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class DownloadActor extends Actor {

  public DownloadActor(Channel<Message> inChannel, 
                       Channel<Message> outChannel,
                       Fiber fiber) {
    super(inChannel, outChannel, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Requested ", "Downloaded ");
    System.out.println(payload);
    message.payload = payload;
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/IndexActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class IndexActor extends Actor {

  public IndexActor(Channel<Message> inChannel, 
                    Channel<Message> outChannel,
                    Fiber fiber) {
    super(inChannel, outChannel, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Downloaded ", "Indexed ");
    System.out.println(payload);
    message.payload = payload;
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/WriteActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class WriteActor extends Actor {

  public WriteActor(Channel<Message> inChannel, 
                    Channel<Message> outChannel,
                    Fiber fiber) {
    super(inChannel, null, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Indexed ", "Wrote ");
    System.out.println(payload);
    message.payload = payload;
  }
}

Running the ActorManager produces the expected interleaved output, showing that the Actors are acting asynchronously.

I was curious about the assertion that Events are a bad idea for high-concurrency servers (PDF), so I figured that now I had two trivial versions of my application that did nothing substantial, it would be interesting to see if Kilim's wait-loop approach performed better than Jetlang's pub/sub approach. Here are the numbers - note that the times are just wall-clock times taken under very uncontrolled situations. The red line is for Kilim numbers and the green line is Jetlang's. So it looks like (at least in my test) the message passing overhead for Jetlang is slightly higher than Kilim, and increases with higher loads. However, what is an acceptable overhead for message passing would depend a lot on the application - it would probably be less important as the running times of the actors increases.

#-TasksKilimJetlang
1910
102649
10071105
1000605630
1000046485997
1000003707646426

Update - 2009-01-02

The performance numbers shown above are skewed due to the presence of Console IO in the code. While it is likely that real-world actors will do something more meaningful than replacing one string with another, possibly resulting in blocking similar to the System.out.println() calls above, we want to compare the frameworks themselves. Numbers with println() calls removed are provided in my post here.

Personally, now that I understand Jetlang a bit more than I did last week, here is some feedback. I found Jetlang initially slightly harder to understand than Kilim, but using it was simpler, mainly because there is no bytecode manipulation. I haven't used Jetlang enough to praise/blame it from a programming perspective, but here is a review of Retlang from a C#/Python developer that may be useful. Mike Rettig (the (original) author of Retlang and Jetlang) also posts interesting information on Jetlang in his blog.

Merry Christmas, everyone! Have a great holiday season.

5 comments (moderated to prevent spam):

Anonymous said...

Great post. It would be very interesting to see performance figures for Scala actors compared to these two.

Sujit Pal said...

Thanks Jonas, and yes, a Scala implementation would be the subject of one of my upcoming posts, trying to learn it over the holidays. Also, I had a bug in my GNUPlot script for the chart in the original post, I've posted a new chart that shows that Jetlang is actually less performant than Kilim.

Unknown said...

Thanks for taking some time to evaluate jetlang.

What code did you use for the performance tests? Is it exactly the same as your example? I'll take a look at the performance differences when I get a chance. I'll try to track down the differences. My guess is that it is not due to the overhead of message passing.

Also with Jetlang, you shouldn't need a base message class. Channels are templated so specific typed messages can be published on different channels. Therefore, the subscriber doesn't need to write a complicated switch statement to interpret different messages.

Retlang and Jetlang are not based on Scala actors. Both frameworks are similar in purpose, but were not influeced by scala.

Unknown said...

Having looked at the example a bit more, I think this should probably be implemented with fork/join.

Speed will be most helped with parallel task execution. Jetlang provides sequential task execution and is better suited for writing stateful concurrent services.

I could modify the design of the Jetlang example to give you much better performance, but that wouldn't make sense. Fork/Join is a much better solution for your needs.

Regards,

Mike

Sujit Pal said...

Hi Mike, thanks for the comments, and thanks for building Jetlang, it is very nice to work with. Answers to your questions are inlined with snips of your questions to provide context.

[What code was used to generate numbers...]
The code in the blog was used for the performance tests. The number of tasks was controlled using the upper bound in the for(int i=0;...) loop in ActorManager. However, all that each Actor does is print a message to the console.

[No need for base Message...]
This is actually an artefact of my original design. I wanted to have a way of tracking the Message object so I split it up into (id,payload), where the payload is updated by each Actor.

[Retlang and Jetlang not based on Scala...]
Sorry, this was my bad, I am going to fix it in the post.

[...should be implemented with Fork/Join]
From the little I know about Fork/Join, its a method for splitting up the problem recursively into smaller problems and invoking the smaller tasks in parallel. Once the problem becomes small enough to solve sequentially, it is, and the results are returned back to the parent and so on. I will look some more at Fork/Join, but not sure at the moment how I can model this.

The application I am trying to model is a pipeline (say a|b|c), where each process {a,b,c} can serve multiple (but similar) tasks in parallel. I want each task to be processed sequentially by a, b and c (in that order) so my thought was that Actors were actually the right approach for this?