The title of this post is probably unfair, since the authors of OSWorkflow provide a built-in JMSMessage action to do Fire-and-Forget style calls to a JMS Queue or Topic. However, what I describe here takes that integration one step further, by marrying event-based OSWorkflow processing with Asynchronous with Callback style JMS calls using Apache ActiveMQ. My application needs to fire long running batch job which are dependent on each other. Rather than have a human operator fire them off in the right sequence, the idea is to build a workflow that captures these dependencies, then submit them asynchronously to a JMS queue. As each job complete, the JMS listener which executes these jobs on the other end sends callbacks to the workflow, which allow it to fire the next job in the dependency graph.
The workflow is fairly complex, it contains two splits and two corresponding joins, as shown below. See my previous post for a more thorough discussion of the workflow itself. I have reproduced the graph below for completeness below:
There are three main components in here. First the WorkflowRunner, which loads the OSWorkflow configuration for the workflow into memory. It is called once from an external client (probably a web user) to kick off the workflow. Each workflow step that is responsible for executing a batch job is tied to a custom JmsAction which writes a message to the request.topic Topic. At the far end of the Topics is a JmsServer component, which basically delegates off to Java processes representing each individual batch job. Once a job is complete, it writes a message to the response.topic Topic. This message is picked up by the correct JmsAction, which then publishes an event into the Spring ApplicationContext. The WorkflowRunner then picks this event up, and issues calls to process() recursively until the workflow is complete. The diagram below illustrates this flow.
I describe below each of the individual components. First, the OSWorkflow workflow definition file. Each of the steps are mapped to a JmsAction instance. This is the only difference from the previous week, when everything was mapped to a MockAction instance.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | <?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE workflow PUBLIC
"-//OpenSymphony Group//DTD OSWorkflow 2.8//EN"
"http://www.opensymphony.com/osworkflow/workflow_2_8.dtd">
<workflow>
<initial-actions>
<action id="0" name="start">
<pre-functions>
<function type="class">
<arg name="class.name">com.opensymphony.workflow.util.Caller</arg>
</function>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t0</arg>
</function>
</pre-functions>
<results>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="1"/>
</results>
</action>
</initial-actions>
<steps>
<step id="1" name="p1">
<actions>
<action id="1" name="t1">
<results>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="26"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t1</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="26" name="s26">
<actions>
<action id="26" name="split26">
<results>
<unconditional-result old-status="Finished" owner="${caller}" split="26"/>
</results>
</action>
</actions>
</step>
<step id="2" name="p2">
<actions>
<action id="2" name="t2">
<results>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="34"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t2</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="34" name="s34">
<actions>
<action id="34" name="split34">
<results>
<unconditional-result old-status="Finished" owner="${caller}" split="34"/>
</results>
</action>
</actions>
</step>
<step id="3" name="p3">
<actions>
<action id="3" name="t3">
<results>
<unconditional-result old-status="Finished" owner="${caller}" join="43"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t3</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="4" name="p4">
<actions>
<action id="4" name="t4">
<results>
<unconditional-result old-status="Finished" owner="${caller}" join="43"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t4</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="5" name="p5">
<actions>
<action id="5" name="t5">
<results>
<unconditional-result old-status="Finished" owner="${caller}" join="75"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t5</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="6" name="p6">
<actions>
<action id="6" name="t6">
<results>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="7"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t6</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="7" name="p7">
<actions>
<action id="7" name="t7">
<results>
<unconditional-result old-status="Finished" owner="${caller}" join="75"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t7</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="8" name="p8">
<actions>
<action id="8" name="t8">
<results>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="9"/>
</results>
<post-functions>
<function type="spring">
<arg name="bean.name">jmsAction</arg>
<arg name="action.name">t8</arg>
</function>
</post-functions>
</action>
</actions>
</step>
<step id="9" name="stop">
<actions>
<action id="9" name="t9" finish="true">
<results>
<unconditional-result old-status="Finished" status="Complete"
owner="${caller}"/>
</results>
</action>
</actions>
</step>
</steps>
<splits>
<split id="26">
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="2"/>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="6"/>
</split>
<split id="34">
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="3"/>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="4"/>
</split>
</splits>
<joins>
<join id="43">
<conditions type="AND">
<condition type="beanshell">
<arg name="script"><![CDATA[
"Finished".equals(jn.getStep(3).getStatus()) &&
"Finished".equals(jn.getStep(4).getStatus())
]]>
</arg>
</condition>
</conditions>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="5"/>
</join>
<join id="75">
<conditions type="AND">
<condition type="beanshell">
<arg name="script"><![CDATA[
"Finished".equals(jn.getStep(5).getStatus()) &&
"Finished".equals(jn.getStep(7).getStatus())
]]>
</arg>
</condition>
</conditions>
<unconditional-result old-status="Finished" status="Queued" owner="${caller}"
step="8"/>
</join>
</joins>
</workflow>
|
Next up is the Spring applicationContext.xml file which ties this all together. It also contains some definitions for the JmsAction and JmsServer beans. This is shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<bean class="org.springframework.beans.factory.annotation.RequiredAnnotationBeanPostProcessor"/>
<!-- OSWorkflow -->
<bean id="workflowStore"
class="com.opensymphony.workflow.spi.memory.MemoryWorkflowStore"/>
<bean id="workflowFactory"
class="com.opensymphony.workflow.spi.hibernate.SpringWorkflowFactory"
init-method="init">
<property name="resource" value="workflow-defs.xml"/>
<property name="reload" value="true"/>
</bean>
<bean id="workflowConfiguration"
class="com.opensymphony.workflow.config.SpringConfiguration">
<property name="factory" ref="workflowFactory"/>
<property name="store" ref="workflowStore"/>
</bean>
<bean id="workflowTypeResolver" class="com.opensymphony.workflow.util.SpringTypeResolver">
<property name="functions">
<map>
<entry key="jmsAction"><ref bean="jmsAction"/></entry>
</map>
</property>
</bean>
<bean id="workflow" class="com.opensymphony.workflow.basic.BasicWorkflow"
scope="prototype">
<constructor-arg><value>testuser</value></constructor-arg>
<property name="configuration" ref="workflowConfiguration"/>
<property name="resolver" ref="workflowTypeResolver"/>
</bean>
<bean id="workflowRunner" class="com.mycompany.myapp.workflow.WorkflowRunner"
scope="prototype">
<property name="workflow" ref="workflow"/>
</bean>
<!-- JMS -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
</bean>
<bean id="jmsAction" class="com.mycompany.myapp.workflow.JmsAction" init-method="init"
destroy-method="destroy" scope="prototype">
<property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsServer" class="com.mycompany.myapp.workflow.JmsServer">
<property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</beans>
|
As mentioned above, on the OSWorkflow side, we have a WorkflowRunner component. It implements ApplicationListener so it can respond to callbacks from the JmsAction components. Moving through the workflow takes place in the process() method. The onApplicationEvent() recursively calls the process() method based on events received from the JmsAction instances.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | // WorkflowRunner.java
package com.mycompany.myapp.workflow;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import com.opensymphony.workflow.Workflow;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.Step;
/**
* This class is called once by the client code, then the events that are
* sent back from the Actions in the Workflow will move the workflow forward
* until it ends.
*/
public class WorkflowRunner implements ApplicationListener {
private final Log log = LogFactory.getLog(getClass());
private String workflowName;
private Workflow workflow;
private Map<String,Object> inputs;
private Set<Integer> alreadyExecuted = new HashSet<Integer>();
@Required
public void setWorkflow(Workflow workflow) {
this.workflow = workflow;
}
public void setWorkflowName(String workflowName) {
this.workflowName = workflowName;
}
public void setInputs(Map<String,Object> inputs) {
this.inputs = inputs;
}
public long init(int initialActionId) throws Exception {
long workflowId = workflow.initialize(workflowName, initialActionId, inputs);
return workflowId;
}
@SuppressWarnings({"unused","unchecked"})
public void process(long workflowId) {
List<Step> currentSteps = workflow.getCurrentSteps(workflowId);
for (Step currentStep : currentSteps) {
int[] availableActions = workflow.getAvailableActions(workflowId, inputs);
for (int availableAction : availableActions) {
if (alreadyExecuted.contains(availableAction)) {
continue;
}
try {
alreadyExecuted.add(availableAction);
System.out.println("Sending action.id=" + availableAction + " to JmsServer");
workflow.doAction(workflowId, availableAction, inputs);
} catch (WorkflowException e) {
log.error("Exception in (workflow,action.id)=(" + workflowName + "," +
availableAction + "). Workflow stopped", e);
}
}
}
}
public void onApplicationEvent(ApplicationEvent event) {
if (event.getSource() instanceof Long) {
process((Long) event.getSource());
}
}
}
|
Not too many surprises so far for people who have read my previous blog post. The JmsAction is our bridge between OSWorkflow and JMS, so it has to implement multiple interfaces. The FunctionProvider interface is so it can be injected as a function into the workflow XML configuration. ApplicationContextAware is so it can publish events back into Spring's context where it can be picked up by WorkflowRunner, and MessageListener is so it can listen on JMS events. The code is shown below and is fairly self-explanatory, comments are inlined where I felt more explanation may be helpful.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | // JmsAction.java
package com.mycompany.myapp.workflow;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import com.opensymphony.module.propertyset.PropertySet;
import com.opensymphony.workflow.FunctionProvider;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.WorkflowEntry;
/**
* Publishes a job request to a JMS queue. Listens for a callback from the
* JMS consumer module and passes the event back to the WorkflowRunner event
* listener to move the workflow forward.
*/
public class JmsAction implements FunctionProvider, ApplicationContextAware, MessageListener {
private final Log log = LogFactory.getLog(getClass());
private ConnectionFactory jmsConnectionFactory;
private String actionName;
private ApplicationContext applicationContext;
private Connection connection;
private Session session;
private MessageProducer messageProducer;
private MessageConsumer messageConsumer;
@Required
public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
this.jmsConnectionFactory = jmsConnectionFactory;
}
public void setActionName(String actionName) {
this.actionName = actionName;
}
/**
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* @see com.opensymphony.workflow.FunctionProvider#execute(java.util.Map, java.util.Map, com.opensymphony.module.propertyset.PropertySet)
*/
@SuppressWarnings("unchecked")
public void execute(Map transientVars, Map args, PropertySet ps) throws WorkflowException {
setActionName((String) args.get("action.name"));
WorkflowEntry workflowEntry = (WorkflowEntry) transientVars.get("entry");
final long workflowId = workflowEntry.getId();
try {
MapMessage message = session.createMapMessage();
message.setString("action.name", actionName);
message.setString("workflow.id", String.valueOf(workflowId));
message.setString("topic.name", "request.topic");
messageProducer.send(message);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
/**
* Listens for a text message back from the JmsSubscriber module about job
* completion. The text is formatted, contains Success or Failure, followed
* by workflowId as a comma-separated list.
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@SuppressWarnings("serial")
public void onMessage(Message message) {
if (MapMessage.class.isInstance(message)) {
MapMessage mapMessage = MapMessage.class.cast(message);
try {
String actionName = mapMessage.getString("action.name");
if (actionName.equals(this.actionName)) {
// only respond to events meant for this Action
String status = mapMessage.getString("status");
String workflowId = mapMessage.getString("workflow.id");
if (status.equals("Success")) {
applicationContext.publishEvent(new ApplicationEvent(new Long(workflowId)) {});
} else {
log.error("Action " + actionName + " failed, see server error log for details");
}
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
protected void init() throws Exception {
connection = jmsConnectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic requestTopic = session.createTopic("request.topic");
Topic responseTopic = session.createTopic("response.topic");
messageProducer = session.createProducer(requestTopic);
messageConsumer = session.createConsumer(responseTopic);
messageConsumer.setMessageListener(this);
connection.start();
}
protected void destroy() throws Exception {
session.close();
connection.close();
}
}
|
I started off using Spring's JmsTemplate because I wanted to learn how to use it, but gave up when I could not find a clean way of registering a JmsAction as a listener. If any of you have used JmsTemplate for beans which are both publishers and subscribers, please let me know. Example code (or links to example code) would be greatly appreciated.
One more thing to note is that unlike the WorkflowRunner, which will have only a single instance per workflow, there will be many instances of JmsAction for a given workflow. We specify that its scope is prototype (built every time it is accessed from the Spring context), and that its lifecycle methods are init() and destroy(), all in the Spring configuration above.
Our final component is the JmsServer. All this does currently is to print that it is "executing" something to stdout. In a real application, it would start another Java batch process and wait for it to complete before sending the callback. The code is shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | // JmsServer.java
package com.mycompany.myapp.workflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
/**
* Listens for JMS requests, services them, and returns a callback
* when the job is completed.
*/
public class JmsServer implements MessageListener {
private ConnectionFactory jmsConnectionFactory;
private Connection connection;
private Session session;
private MessageProducer messageProducer;
private MessageConsumer messageConsumer;
public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
this.jmsConnectionFactory = jmsConnectionFactory;
}
/**
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
if (! MapMessage.class.isInstance(message)) {
return;
}
try {
MapMessage mapMessage = MapMessage.class.cast(message);
final String actionName = mapMessage.getString("action.name");
final String workflowId = mapMessage.getString("workflow.id");
// this is where we will delegate to some kind of executor in a real app
System.out.println("Executing job:" + actionName);
// send the callback after the job is done
MapMessage responseMessage = session.createMapMessage();
responseMessage.setString("action.name", actionName);
responseMessage.setString("workflow.id", workflowId);
responseMessage.setString("topic.name", "response.topic");
responseMessage.setString("status", "Success");
System.out.println("Sending callback from server");
messageProducer.send(responseMessage);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
protected void init() throws Exception {
System.out.println("Initializing server");
connection = jmsConnectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic responseTopic = session.createTopic("request.topic");
Topic requestTopic = session.createTopic("response.topic");
messageProducer = session.createProducer(requestTopic);
messageConsumer = session.createConsumer(responseTopic);
messageConsumer.setMessageListener(this);
System.out.println("Server started");
connection.start();
}
protected void destroy() throws Exception {
System.out.println("Shutting down server");
session.close();
connection.close();
System.out.println("Done");
}
public void run() throws Exception {
init();
try {
for (;;) {
Thread.sleep(500L);
}
} finally {
destroy();
}
}
}
|
To test this, we start off the following two unit tests which run forever until they are manually stopped by a Ctrl-C at the command line. The reason I had to do this instead of packaging the whole thing into a single JUnit test as I had done before, was because I noticed that the unit test completed before the workflow had a chance to complete. Since the whole thing is event driven after the first call, there is no way to keep the test running until the workflow is complete. In real-life, this is not a problem because these are likely to be long-lived server processes anyway.
This JUnit test starts the JmsServer end of the setup and loops forever, sleeping for 0.5s between loops. I run this in one console window.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | // JmsServerTest.java
package com.mycompany.myapp.workflow;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class JmsServerTest {
private JmsServer jmsServer;
@Before
public void setUp() throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
jmsServer = (JmsServer) context.getBean("jmsServer");
}
@Test
public void runJmsServer() throws Exception {
jmsServer.run();
}
}
|
This JUnit test starts the OSWorkflow end of the setup and after the first call to process(), also loops forever. I run this in a second console window.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | // WorkflowRunnerWithJmsActionsTest.java
package com.mycompany.myapp.workflow;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class WorkflowRunnerWithJmsActionsTest {
private WorkflowRunner runner;
private Map<String,Object> inputs = new HashMap<String,Object>();
private long workflowId;
@Before
public void setUp() throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
runner = (WorkflowRunner) context.getBean("workflowRunner");
runner.setWorkflowName("workflow-def-1");
runner.setInputs(inputs);
workflowId = runner.init(0);
}
@Test
public void runWorkflow() throws Exception {
runner.process(workflowId);
for (;;) {
Thread.sleep(500L);
}
}
}
|
Running these produces the following output on the client end, and a similar output on the server end. Looking at the output, it is clear that the setup works, ie the output is what you would expect given the dependencies designed into the workflow.
1 2 3 4 5 6 7 8 9 10 11 | Sending action.id=1 to JmsServer
Sending action.id=26 to JmsServer
Sending action.id=2 to JmsServer
Sending action.id=6 to JmsServer
Sending action.id=34 to JmsServer
Sending action.id=7 to JmsServer
Sending action.id=3 to JmsServer
Sending action.id=4 to JmsServer
Sending action.id=5 to JmsServer
Sending action.id=8 to JmsServer
Sending action.id=9 to JmsServer
|
A nice tool that helped me debug my code is ActiveMQ's web-based administrator interface. Here you can see how many messages were received and sent on each of the topics. You can delete messages off the queue, which is very helpful after unsuccessful runs. It is available at 0.0.0.0:8161 of the machine where ApacheMQ is started.
So there you have it folks. A simple, asynchronous, event-driven way to manage dependencies among multiple batch jobs.
Hi, very nice article!
ReplyDeleteI would like to try to run your example, but I am new to all this. Is it possible for you to make the whole code available (with ant, or maven script) so that it can be built and tested?
Thanks a lot
Hi, thanks for the compliment.
ReplyDeleteTo answer your question, sorry, no I cannot, because I don't have the code anymore. However, to run the examples, just do the following.
1) Create a maven project with archetype:create.
2) Create src/main/resources and copy over the applicationContext.xml and the osworkflow.xml file into it.
3) Copy over the Java source classes into src/main/java, creating any necessary packages (from the package declaration).
4) Copy over the Java test files into src/test/java, as above wrt package names.
5) Create a pom.xml (or ant build.xml) if that makes more sense) with necessary library dependencies.
6) To run the test, use mvn test -Dtest=JunitTestShortClassName.