Wednesday, December 05, 2007

JMS Patterns with ActiveMQ

Quite some time ago, before I started this blog, I used to have a Geocities (now Yahoo!) homepage where I would write articles about stuff I did, much like this blog here. One such article, "Java Patterns for MQ Series Clients", described various types of clients that could be written against a MQ-Series server. Unlike most of the other stuff I wrote there, it proved to be somewhat popular, a couple of people actually liked it enough to send me email about it.

I was working on a Java/MQ-Series project at the time, and I wrote it as part of learning the MQ-Series Java API. Although the JMS specification was out, and a JMS wrapper around the MQ-Series API was also available, it was still quite new and relatively untested, so a decision was made to code directly against the MQ-Series Java API (which itself was a wrapper over the MQ-Series C API) instead. Since then, no JMS project has come my way, so, even though I had read about the JMS API and was familiar with the concepts (having worked on its predecessor product), I had never worked with JMS. Since I mostly learn by doing, I figured that it would be instructive to try and code up the same scenarios described in my old article using JMS. This blog post is a result of that effort.

For the server, I chose Apache ActiveMQ, a free and open source JMS implementation. Setting it up was quite simple. All I had to do was download the latest stable binary distribution (4.1.1 in my case), untar it into a directory, then run its startup script to bring up a TCP listener on port 61616.

1
2
3
sujit@sirocco:~/tmp$ tar xvzf apache-activemq-4.1.1.tar.gz
sujit@sirocco:~/tmp$ cd apache-activemq-4.1.1
sujit@sirocco:~/tmp/apache-activemq-4.1.1$ bin/activemq

The diagram below describes my basic setup. There is a server which loops forever, reading messages off a request queue, doing some processing, and writing responses to a response queue. Clients communicate with the server by writing messages to the request queue, and optionally reading messages off the response queue. The point-to-point JMS style is used.

The code for the server is shown below. It reads a (text) message from the request queue, prepends "Processed" to it, and places it on the response queue. The init() method sets up the queues, and the destroy() method tears down the queues.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// MessageServer.java
package com.mycompany.mqpatterns;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MessageServer implements MessageListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private Connection connection;
  private Session session;
  private MessageConsumer consumer;
  private MessageProducer producer;
  
  public void init() throws Exception {
    // set 'er up
    ActiveMQConnectionFactory connectionFactory = 
      new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = connectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // create our request and response queues
    Topic request = session.createTopic("request.queue");
    Topic response = session.createTopic("response.queue");
    // and attach a consumer and producer to them
    consumer = session.createConsumer(request);
    consumer.setMessageListener(this);
    producer = session.createProducer(response);
    // start your engines...
    connection.start();
  }
  
  public void destroy() throws Exception {
    session.close();
    connection.close();
  }

  public void onMessage(Message message)  {
    try {
      if (message instanceof TextMessage) {
        String messageText = ((TextMessage) message).getText();
        log.debug("Server: Got request [" + messageText + "]");
        Message responseMessage = 
          session.createTextMessage("Processed " + messageText);
        if (message.getJMSCorrelationID() != null) {
          // pass it through
          responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
        }
        producer.send(responseMessage);
      }
    } catch (JMSException e) {
      log.error(e);
    }
  }
}

On re-reading my original article, I found that there were actually only 3 basic patterns which I had covered, not 5, so I wrote implementations for these 3 patterns, which are described below. A lot of code in the init() and destroy() methods are boiler-plate code, but they just serve to set up and take down the queues.

  • Fire And Forget
  • Pseudo Synchronous
  • Asynchronous with Callback

Fire And Forget

A real-life analog for this would be writing a letter and dropping it in the mail. You (the client) do not really know when and if it will reach its destination, and you do not expect a response or an acknowledgement from the reciever. The code for this is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// FireAndForgetClient.java
package com.mycompany.mqpatterns;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class FireAndForgetClient {

  private final Log log = LogFactory.getLog(getClass());
  
  private Connection connection;
  private Session session;
  private MessageProducer producer;
  private MessageConsumer consumer;
  
  public void init() throws Exception {
    // set 'er up
    ActiveMQConnectionFactory connectionFactory = 
      new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = connectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // create our request and response queues
    Topic request = session.createTopic("request.queue");
    // and attach a consumer and producer to them
    producer = session.createProducer(request);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    // and start your engines...
    connection.start();
  }

  public void destroy() throws Exception {
    session.close();
    connection.close();
  }

  public void sendMessage(String messageText) throws Exception {
    producer.send(session.createTextMessage(messageText));
  }
}

Pseudo Synchronous

For all practical purposes, this client is synchronous. I guess the pseudo is only there to emphasise the fact that it is based on a medium that is inherently asynchronous. A real life analog to this would be a (half-duplex) telephone conversation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// PseudoSynchronousClient.java
package com.mycompany.mqpatterns;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class PseudoSynchronousClient {

  private final Log log = LogFactory.getLog(getClass());
  
  private Connection connection;
  private Session session;
  private MessageProducer producer;
  private MessageConsumer consumer;
  
  private String response;

  public void init() throws Exception {
    // set 'er up
    ActiveMQConnectionFactory connectionFactory = 
      new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = connectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // create our request and response queues
    Topic request = session.createTopic("request.queue");
    Topic response = session.createTopic("response.queue");
    // and attach a consumer and producer to them
    producer = session.createProducer(request);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    consumer = session.createConsumer(response);
    // and start your engines...
    connection.start();
  }

  public void destroy() throws Exception {
    session.close();
    connection.close();
  }
  
  public String sendMessage(String messageText) throws Exception {
    try {
      log.info("Client: Send request [" + messageText + "]");
      producer.send(session.createTextMessage(messageText));
      Message response = consumer.receive();
      String responseText = ((TextMessage) response).getText(); 
      log.info("Client: Got response [" + responseText + "]");
      return responseText;
    } catch (JMSException e) {
      log.error("JMS Exception on client", e);
    }
    return response;
  }
}

Asynchronous With Callback

Like the Fire and Forget pattern, this is asynchronous, but unlike it, the client can specify that some action should be triggered when the server is done processing the request. A real-life analog for this case would be mailing a letter with a return receipt. The example I use here is to have an acknowledgement sent back to the client, which the client can store in a local database (an in-memory HashMap in this case). The caller will send a message to the server through the client and get back the message Id. The caller can then check on the status of the message.

The request and response messages are tied together using the JMS Correlation Id. We generate a UUID when the message is sent and put it in the JMS Correlation Id. If the server finds that the incoming message contains a JMS Correlation Id, it passes it through in its response.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// AsynchronousWithCallbackClient.java
package com.mycompany.mqpatterns;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AsynchronousWithCallback implements MessageListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private Connection connection;
  private Session session;
  private MessageProducer producer;
  private MessageConsumer consumer;

  // to store acknowledgements as they are recieved. Should be replaced 
  // by a more persistent mechanism
  private Map<String,String> messageStatus = new HashMap<String,String>();
  
  public void init() throws Exception {
    // set 'er up
    ActiveMQConnectionFactory connectionFactory = 
      new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = connectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // create our request and response queues
    Topic request = session.createTopic("request.queue");
    Topic response = session.createTopic("response.queue");
    // and attach a consumer and producer to them
    producer = session.createProducer(request);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    consumer = session.createConsumer(response);
    consumer.setMessageListener(this);
    // and start your engines...
    connection.start();
  }

  public void destroy() throws Exception {
    session.close();
    connection.close();
  }
  
  public String sendMessage(String messageText) throws Exception {
    TextMessage message = session.createTextMessage(messageText);
    String messageId = UUID.randomUUID().toString();
    message.setJMSCorrelationID(messageId);
    producer.send(message);
    return messageId;
  }

  public String getStatus(String correlationId) {
    synchronized(this) {
      if (messageStatus.containsKey(correlationId)) {
        String status = messageStatus.get(correlationId);
        messageStatus.remove(correlationId);
        return status;
      } else {
        return null;
      }
    }
  }
  
  public void onMessage(Message message) {
    synchronized(this) {
      try {
        if (message instanceof TextMessage) {
          String originalMessageId = message.getJMSCorrelationID();
          String responseText = ((TextMessage) message).getText();
          messageStatus.put(originalMessageId, responseText);
        }
      } catch (JMSException e) {
        log.error("JMS Exception encountered on client", e);
      }
    }
  }
}

Test Harness

To test these clients, I set up a JUnit test as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// JmsClientPatternsTest.java
package com.mycompany.mqpatterns;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class JmsClientPatternsTest {

  private final Log log = LogFactory.getLog(getClass());
  
  private static MessageServer server;
  private static String[] messages = StringUtils.split(
    "The quick brown fox jumped over the lazy dog");

  private FireAndForgetClient fireAndForgetClient;
  private PseudoSynchronousClient pseudoSynchronousClient;
  private AsynchronousWithCallbackClient asyncWithCallbackClient;
  
  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    server = new MessageServer();
    server.init();
  }

  @AfterClass
  public static void tearDownAfterClass() throws Exception {
    server.destroy();
  }

  @Before
  public void setUp() throws Exception {
    fireAndForgetClient = new FireAndForgetClient();
    fireAndForgetClient.init();
    pseudoSynchronousClient = new PseudoSynchronousClient();
    pseudoSynchronousClient.init();
    asyncWithCallbackClient = new AsynchronousWithCallbackClient();
    asyncWithCallbackClient.init();
  }

  @After
  public void tearDown() throws Exception {
    fireAndForgetClient.destroy();
    pseudoSynchronousClient.destroy();
    asyncWithCallbackClient.destroy();
  }

  @Test
  public void testFireAndForgetClient() throws Exception {
    for (String message : messages) {
      fireAndForgetClient.sendMessage(message);
    }
  }
  
  @Test
  public void testPseudoSynchronousClient() throws Exception {
    for (String message : messages) {
      String response = pseudoSynchronousClient.sendMessage(message);
      log.debug("response=" + response);
    }
  }

  @Test
  public void testAsynchronousWithCallbackClient() throws Exception {
    List<String> sentMessageIds = new ArrayList<String>();
    for (String message : messages) {
      String messageId = asyncWithCallbackClient.sendMessage(message);
      sentMessageIds.add(messageId);
    }
    Thread.sleep(2000);
    for (String sentMessageId : sentMessageIds) {
      String response = asyncWithCallbackClient.getStatus(sentMessageId);
      log.debug("response[" + sentMessageId + "]=" + response);
    }
  }
}

Conclusion

Comparing to what I remembered with working with the Java API for MQ-Series, JMS is easier to work with. The API provides an onMessage(Message) strategy interface method to be overriden in both the client and server. With the Java API to MQ-Series, the behavior of tying a listener process to a queue had to be configured in the server. Also the queues had to be manually defined prior to being accessed from client code. It is possible that enterprise setups would follow the same strategy, and I don't know enough about either MQ-Series or ActiveMQ Administration to comment intelligently on that.

Obviously there is much more to JMS than this, but the JMS API is fairly simple and compact and can be easily learned. My take on the complexity of the JMS API is that it is similar to JDBC. You need to learn a few basic things to be able to start coding immediately, and the rest you can learn as you go. I found the simplicity of the API to be very attractive.

However, more than the JMS API itself, what I really like is the idea of neatly decoupling two components by putting a pair of queues in between. There are many applications that inherently lend themselves to this sort of design, and these applications typically scale better than those which don't.

Often, people get the design right, but end up implementing a home grown thread based solution for the application. Since the average Java programmer is generally not very well versed in thread programming, this can lead to a maintenance bottleneck, with only one or two persons in a group who know enough to debug and fix problems in the code. JMS provides the tools by which such designs can be implemented with no thread based application code.

Thus, for a Java programmer, I think there is a high benefit to cost ratio in favor of learning JMS. Not only is it easy to learn, but it opens your eyes to patterns which take advantage of inherent asynchronicity in an application, and provides you tools to very easily implement these patterns.

18 comments (moderated to prevent spam):

said...

This's a great article for JMS. I translate it into Chinese in my Blog. If you don't agree, I will delete it.

The Chinese article link:
http://gocom.primeton.com/blog7441_14478.htm

Conan Zhang
MSN:bsspirit@163.com
Email:bsspirit@163.com

Sujit Pal said...

Hi Conan, no its totally fine, thanks for letting me know, and I am honored that you think the post is worthy of translation.

Pints said...

Great article to read and understand the basics. But i am a newbie on JMS so not sure hot to execute these programs. My understand i have ActiveMQ setup on my machine up and running. Created both server and client programs then started the activemq console. Now do i have to run the server program first and then the client program. Please clarify.

thanks

Sujit Pal said...

Hi Pints, thanks for your comment and glad you liked it. Yes, you would start the server first, then the client.

Dilraj said...

Sujit hi, i am new to JMS ,please can you explain me what is different between new ActiveMQConnectionFactory("vm://localhost"); and the way you created Factory?
. Thankds in advance,Dil

Sujit Pal said...

Hi Dilraj, I haven't used ActiveMQ much myself, so my answer may not be completely accurate, but the vm: prefix seems to indicate that the MQ listener is in the same JVM as the job publishers. The way I have created it is with a tcp: prefix, meaning that the MQ listener is a separate daemon process outside the client JVM. So in my case, it would be possible to host the VM on a separate machine if necessary, which may be good for scalability and maintenance. However, if you are building a self-contained MQ backed webapp (the servlet container is your daemon in that case), where clients submit (as durable messages) jobs to the in-VM MQ listener, then perhaps the vm: approach may be suitable.

Vladimir Bruha said...

This is great article,
I have been looking for your older article you mentioned ( "Java Patterns for MQ Series Clients"), because I need some inspiration for my current java development. I would like to create set of java stored procedures inside Oracle 10g Database with similar functionality. Unfortunatelly the old link for source code is not working anymore.(http://www.geocities.com/sujitpal/downloads/artwb006.tar.gz). Could you send me some working URL for this download?

Vladimir Bruha
Email: vladimir.bruha@a-h.cz

Sujit Pal said...

Hi Vladimir, thank you for your kind words. As for the old article, sorry about the problem with the link, its fixed now.

Kris Bravo said...

In your pseudo synchronous case I wasn't able to find anything which indicated that the response your client received corresponded to the request it sent. In other words, if you had two clients calling how do you know in your code example that you got the right response?

Sujit Pal said...

Hi Kris, the naming convention was from a book on MSMQ that I read long ago...the pseudo sync mode appears to be (to the client) a sync mode. The message is written to a queue and the client would block until a response came back. Looking back now, though, I think this wouldn't scale all that well. Internally, there would have to be some callback mechanism similar to the async-with-callback pattern.

Ilango said...

Nice article. My question to you: since the Message Server code has an onMessage, does it have to run in an EJB server as an MDB?

Sujit Pal said...

Thanks llango. You don't need an EJB Server, all you need is one (or more) MessageListener implementation which I am doing here with MessageServer.

Anonymous said...

hi sujit, this is absolutely good article for every jms learners. i need to send emails from my java code by using jms and activemq. i have established activemq connection and sent mail object(this mail object is a bean which contains all required information like host name, to address, from address, subject and so on to send mail.) to the destination queue. i have written code for message consumer process and mail delivery process(here i would get values from mail object where i set earlier while sending destination queue.) in a separate class. can ActiveMQ trigger this class method automatically? bcoz i would send my mail object values to activemq. it should take care of sending mails. the main reason for this is: if the mail server is down, my code hangs for some times. i want this process to happen behind the scenes with the help of activemq. thanks in advance.

Sujit Pal said...

Thanks, glad you like it. To answer your question, as long as your mail server is configured as a JmsListener that listens to the queue you are dropping the email beans into, then it should work. When you stop the listener, then your emails will accumulate in the queue, once the listener comes back up, it will consume from the queue and send out the email.

Sathishkumar said...

Hi Sujit, Merry Xmas.Thank you very much for your valuable reply. As U said emails will be accumulated in the queue when mail server is down, once it resumes, it will consume from the queue and send out the email. i am facing an issue here. when i use the receive() method of MessageConsumer object, the message is dequeued from the ActiveMQ.This receive() method returns my mail object. then i fetch these values to send mails.If the message is dequeued(by calling receive() method) but the mail server is down, it will not go into enqueue status from dequeue status. how will ActiveMQ send this mail again once the mail server resumes. Bcoz it will try to send mails only from enqueue status. thanks in advance.

Sujit Pal said...

Thanks Sathishkumar, a very Merry Christmas to you too. As far as I know, there is no framework level support for the scenario you described. However, you can deal with this in application code with try/catch. If the mail server is down it will throw a connection refused exception, in your catch block you attempt to submit the message periodically in a loop, until the message is consumed successfully, or send it to a backout queue as described on this SO page.

Namratha said...

Hi,

Can you please send me the code for sending email using jms and apachemq

Sujit Pal said...

Hi Namratha,

I don't have email sending over JMS Java code handy, but it should be pretty easy to write yourself. You can find a basic example of sending mail using JavaMail here. The JMS setup is similar, except that instead of the Transport.send(email) do a producer.send(email). This will write to the queue. Then you have a subscriber which will read off the queue and just do Transport.send(message) in its onMessage() method.