Tuesday, October 14, 2014

Spring JMS with ActiveMQ - hello world example - receive message using message listener adapter

This tutorial will demo how to receive message by using Spring JMS and ActiveMQ. This tutorial uses Spring JMS's MessageListenerAdapter. Compare to receiving message using MessageListener interface, there are 2 advantages you can get:

  1. Use any POJO class as real message listener, no need to implements any interface in your Java class.
  2. Easy to send message back in most cases.

There are other ways to  receive message from ActiveMQ broker using Spring JMS, examples are provided here

The example on how to send out message with Spring JMS and ActiveMQ is in this article.

1. What you need

  • JDK 1.7+
  • Maven 3.2.1
  • ActiveMQ 5.10.0
  • Spring 4.1.0.RELEASE  (acquired by Maven)
We’ll use maven to manage all dependencies.  To write code there is no need for ActiveMQ binary, since maven will take care of the ActiveMQ library we need. But to run the code, we need the ActiveMQ binary, In this example  we'll run the ActiveMQ broker on a machine of IP 192.168.203.143 with default port 61616. If you run the broker in a different IP or port, don't forget to change the broker URL in Spring configuration file.

2. Configure the maven  pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>spring-jms-activemq-receive-direct</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>

<!-- Spring Version -->
<spring-framework.version>4.1.0.RELEASE</spring-framework.version>

<!-- ActiveMQ Version -->
<activemq.version>5.10.0</activemq.version>

</properties>

<dependencies>
<!-- Spring Artifacts-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-framework.version}</version>
</dependency>

<!-- ActiveMQ Artifacts -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>

</dependencies>

<!-- Use JDK 1.7+ -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

As you can see from the maven pom.xml file, we use 3 key artifacts in this example, spring-context, spring-jms and activemq-all.  The first 2 belong to spring framework and the last one is ActiveMQ  implementation for jms.  The final part of the pom file specified the JDK version for compiling.



3. Define the Java Class


We are going to define 2 classes. The first one is a simple spring bean:
package com.shengwang.demo;

import org.springframework.stereotype.Service;

@Service
/**
* POJO class, have handleMessage(...) method.
* The return of handleMessage(...) will be
* automatically send back to message.getJMSReplyTo().
*/
public class JmsMessageListener {

public String handleMessage(String text) {
System.out.println("Received: " + text);
return "ACK from handleMessage";
}
}

The above JmsMessageListener Class is just a POJO java class, which has neither parent class nor interface implementation. The JmsMessageListener annotated by @Service as spring bean. The default listener method that will be invoked is "handleMessage" for MessageListenerAdapter, but can be changed to different method  in Spring Configuration.

Anything return from handleMessage will be send out as a message back to JMS broker. The destination will be the ReplyTo header of the incoming message. If it's not set, reply message will be send to default destination.

One thing need to mention is something called MessageConvertor. MesssageConvertor will be called before and after the method handleMessage. MessageConvertor is set in Spring configuration for MessageListenerAdapter bean. If not specified, a default one from spring will be used. The default Spring  MessageConvertor converts received message to normal Java data types as input parameter of  handleMessage before invoking.  After handleMessage finishes, it converts return to a kind of  JMS message.

In this demo, a received ActiveMQTextMessage  object is transferred to a String object before calling handleMessage. A return String object is transferred back to a TextMessage object.

How or where is this JmsMessageListener  class registered for receiving message? It's all in the spring configuration file. we'll come to this in the below. Now let’s take a glance at the main class.

package com.shengwang.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DemoMain {

public static void main(String[] args) {
// create Spring context
ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");

// sleep for 1 second
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// close application context
((ClassPathXmlApplicationContext)ctx).close();
}
}
In the main, one thing to notice is there is no trace of the message listener we just defined. Because the message listener will be call asynchronously , you do (can)  not to invoke it explicitly.

Also we use ClassPathXmlApplicationContext to lookup all spring beans in spring configuration file called “app-context.xml”.  We wait for 1 second to let our message listener do it's job, then at the end of main, we close the spring context.



4. spring configuration


Our spring configuration file is named “app-context.xml”. It stays in the man resource path /src/main/resources/ to store the spring configuration file. In the configuration file, we define beans in this order:

Connection Factory  bean (ActiveMQ) –> cached Connection Factory  bean ( Spring jms) –> MessageListenerAdapter bean(Spring jms) ->MessageListenerContainer bean(Spring jms).

The cached connection factory bean is necessary because the Spring will open/close connection on every send or receive action.  So in practical there will always a cached connection factory bean beside the direct connection factory bean.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">


<context:component-scan base-package="com.shengwang.demo" />


<!-- =============================================== -->
<!-- JMS Common, Define Jms connection factory -->
<!-- =============================================== -->
<!-- Activemq connection factory -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- brokerURL -->
<constructor-arg index="0" value="tcp://192.168.203.143:61616" />
</bean>

<!-- Pooled Spring connection factory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>



<!-- ============================================================= -->
<!-- JMS receive. -->
<!-- Define MessageListenerAdapter and MessageListenerContainer -->
<!-- ============================================================= -->
<bean id="messageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="jmsMessageListener" />
</bean>

<bean id="messageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="Send2Recv" />
<property name="messageListener" ref="messageListenerAdapter" />
</bean>

</beans>

As you can see from the Spring configuration file, <context:component-scan ...> is used to let spring find out all beans automatically. In our example we have defined only one bean, jmsMessageListener . Next we define connection factories. The ActiveMQ's connection factory bean works as an input of Spring's pooled connection factory. These 2 connection factory beans will be needed both for sending and receiving messages. 

Next we wrap the POJO class JmsMessageListener by Spring MessageListenerAdapter. Our jmsMessageListener bean is constructor parameter of messageListenerAdapter bean.

Finally we define MessageListenerContainer bean. Spring use MessageListenerContainer to manager messageListener beans. After connection factory beans, we define a bean of spring's DefaultMessageListenerContainer . There are 3 parameters set for this bean. connectionFactory is for connection, destinationName is the Queue's name to receive message from.  messageListener refer to our messageListenerAdapter bean which we wrapped our JmsMessageListener  POJO class with Spring MessageMessageListenerAdapter. So it's here we let spring context know where to receive  messages, and what  to do when a message arrives.

Everything is almost done now. Let's review the directory structure for our maven project.
image



5. Run the code


Before you can run the code, you need to make sure the ActiveMQ broker is running. So our jave code can connect to it and receive message from it.   Make sure the broker IP and port are correct in your spring configuration file "app-context.xml". Run the ActiveMQ broker like with this command.
ACTIVEMQ_INSTALL_DIR/bin/activemq start

Now you can run you main class.  you can run it from IDE such as eclipse, or you can run it direct in command line by using maven.
cd spring-jms-activemq-send
# run mvn from project directory
mvn exec:java -Dexec.mainClass="com.shengwang.demo.DemoMain"

ActiveMQ broker can be managed by browser on port 8161.  My ActiveMQ broker is running on IP 192.168.203.143, so I can access URL http://192.168.203.143:8161/admin to browse all queues and all messages in the queues. If there is no message in the broker, the main will sleep 1 second, do nothing and exit.  To better show how the message listener works, you can either run the sender example in this example, which will send a message to destination "Send2Recv", or create a message manually from ActiveMQ Web Console by using browser (Don't forget to set "ReplyTo" in the created message's header, cause we need it to send reply back in this example).

For demo, I send 2 messages with different ReplyTo in its message header.

image 
Then we run the message listener main. After running the main of this message listener example, the pending messages in Queue "Send2Recv" are gone. Two messages are replied to different destinations.

image


The content of the message is the return String of listener method handleMessage in class JmsMessageListener.

image

3 comments:

Powered by Blogger.

About The Author

My Photo

Has been a senior software developer, project manager for 10+ years. Dedicate himself to Alcatel-Lucent and China Telecom for delivering software solutions.

Pages

Unordered List