Friday, October 10, 2014

Spring JMS with ActiveMQ - hello world example - receive message using direct message listener

This tutorial will demo how to receive message by using Spring JMS and ActiveMQ. This tutorial uses class directly implements Spring JMS's MessageListener or SessionAwareMessageListener interface. There are other ways to  receive message from ActiveMQ broker using Spring JMS, examples are provided here.

The example on how to send out message with Spring JMS and ActiveMQ is in this article

1. What you need

  • JDK 1.7+
  • Maven 3.2.1
  • ActiveMQ 5.10.0
  • Spring 4.1.0.RELEASE  (acquired by Maven)
We’ll use maven to manage all dependencies.  To write code there is no need for ActiveMQ binary, since maven will take care of the ActiveMQ library we need. But to run the code, we need the ActiveMQ binary, In this example  we'll run the ActiveMQ broker on a machine of IP 192.168.203.143 with default port 61616. If you run the broker in a different IP or port, don't forget to change the broker URL in Spring configuration file.

2. Configure the maven  pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>spring-jms-activemq-receive-direct</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>

<!-- Spring -->
<spring-framework.version>4.1.0.RELEASE</spring-framework.version>

<!-- ActiveMQ -->
<activemq.version>5.10.0</activemq.version>

</properties>

<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-framework.version}</version>
</dependency>

<!-- ActiveMQ Artifacts -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>

</dependencies>

<!-- Use JDK 1.7+ -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>


As you can see from the maven pom.xml file, we use 3 key artifacts in this example, spring-context, spring-jms and activemq-all.  The first 2 belong to spring framework and the last one is ActiveMQ  implementation for jms.  The final part of the pom file specified the JDK version for compiling.



3. Define the Java Class


We are going to define 2 classes. The first one is a simple spring bean:
package com.shengwang.demo;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Service;

@Service
/**
* Listener Implement Spring SessionAwareMessageListener Interface
*
*/
public class JmsMessageListener implements SessionAwareMessageListener {

@Override
public void onMessage(TextMessage message, Session session) throws JMSException {
// This is the received message
System.out.println("Receive: "+message.getText());

// Let's prepare a reply message - a "ACK" String
ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("ACK");

// Message send back to the replyTo address of the income message.
// Like replying an email somehow.
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(textMessage);
}
}



In the above JmsMessageListener Class, it implements Spring's SessionAwareMessageListener interface and the method defined in the interface, the onMessage method. This onMessage method will be called asynchronously on every message arriving. Look through the code, we add @service annotation to make JmsMessageListener class a Spring bean. The onMessage takes 2 parameters.  First one is the message, second one is the Session which you can use to send  out a reply message.


You may be aware of that the JmsMessageListener class implements the interface SessionAwareMessageListener instead of interface MessageListener. The only difference is SessionAwareMessageListener will give one more parameter for method onMessage, Session, which can be used to send message back in you listener.


In the onMessage method, we deal with the income message, print out the content to screen, then create a text message and finally send it back. We use the replyTo info of the income message as the destination to send out.  This somehow really seems like the mechanism of replying a email.  If there is no replyTo info set in the message header of the income message, you need to explicit define a destination to reply.



How or where is this JmsMessageListener  class registered for receiving message? It's all in the spring configuration file. we'll come to this in the below. Now let’s take a glance at the main class.

package com.shengwang.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DemoMain {

public static void main(String[] args) {
// create Spring context
ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");

// sleep for 1 second
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// close application context
((ClassPathXmlApplicationContext)ctx).close();
}
}

In the main, one thing to notice is there is no trace of the message listener we just defined. Because the message listener will be call asynchronously , you do (can)  not to invoke it explicitly.
Also we use ClassPathXmlApplicationContext to lookup all spring beans in spring configuration file called “app-context.xml”.  We wait for 1 second to let our message listener do it's job, then at the end of main, we close the spring context.



4. spring configuration


Our spring configuration file is named “app-context.xml”. It stays in the man resource path /src/main/resources/ to store the spring configuration file. In the configuration file, we define beans in this order:

Connection Factory  bean (by jms provider ActiveMQ) –> cached Connection Factory  bean ( by Spring jms) –> MessageListenerContainer bean (by spring jms).

The cached connection factory bean is necessary because the Spring will open/close connection on every send or receive action.  So in practical there will always a cached connection factory bean beside the direct connection factory bean.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">


<context:component-scan base-package="com.shengwang.demo" />

<!-- =============================================== -->
<!-- JMS Common,Define JMS connection Factory -->
<!-- =============================================== -->
<!-- Activemq connection factory -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- brokerURL -->
<constructor-arg index="0" value="tcp://192.168.203.143:61616" />
</bean>

<!-- Pooled Spring connection factory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>



<!-- ============================================================= -->
<!-- JMS Receive,Define MessageListenerContainer -->
<!-- ============================================================= -->
<bean id = "messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="Send2Recv"/>
<property name="messageListener" ref="jmsMessageListener"/>
</bean>

</beans>



As you can see from the Spring configuration file, we use <context:component-scan ...> to let spring find out all beans automatically. In our example we have defined only one bean, jmsMessageListener . Next we define connection factories. The ActiveMQ's connection factory bean works as an input of Spring's pooled connection factory. These 2 connection factory beans will be needed both for sending and receiving messages. 



Spring use MessageListenerContainer to manager messageListener beans. After connection factory beans, we define a bean of spring's DefaultMessageListenerContainer . There are 3 parameters set for this bean. connectionFactory is for connection, destinationName is the Queue's name to receive message from.  messageListener refer to our jmsMessageListener bean which we defined in the Java class JmsMessageListener above. So it's here we let spring context know where to receive  messages, and what  to do when a message arrives.

Everything is almost done now. Let's review the directory structure for our maven project.
image



5. Run the code


Before you can run the code, you need to make sure the ActiveMQ broker is running. So our jave code can connect to it and receive message from it.   Make sure the broker IP and port are correct in your spring configuration file "app-context.xml". Run the ActiveMQ broker like with this command.
ACTIVEMQ_INSTALL_DIR/bin/activemq start

Now you can run you main class.  you can run it from IDE such as eclipse, or you can run it direct in command line by using maven.
cd spring-jms-activemq-send
# run mvn from project directory
mvn exec:java -Dexec.mainClass="com.shengwang.demo.DemoMain"


ActiveMQ broker can be managed by browser on port 8161.  My ActiveMQ broker is running on IP 192.168.203.143, so I can access URL http://192.168.203.143:8161/admin to browse all queues and all messages in the queues. If there is no message in the broker, the main will sleep 1 second, do nothing and exit.  To better show how the message listener works, you can either run the sender example in this example, which will send a message to destination "Send2Recv", or create a message manually from ActiveMQ Web Console by using browser (Don't forget to set "ReplyTo" in the created message's header, cause we need it to send reply back in this example).


image 
Then we run the message listener main. After running the main of this message listener example, the messages in Queue "Send2Recv" are gone. There are 2 new message on the Queue "Recv2Send", they are our ack reply message which we send back in the onMessage method of the listener.





image

7 comments:

  1. helped me a lot, thank man.
    just one typo >> JmsMessageListener implements SessionAwareMessageListener
    should be >> JmsMessageListener implements SessionAwareMessageListener {

    ReplyDelete
  2. thanks.
    public class JmsMessageListener implements SessionAwareMessageListener
    maybe "", right?

    ReplyDelete
  3. What If I want to persist the message object to the local database? Do I need to specify any other configuration in the xml?

    ReplyDelete
    Replies
    1. Here is the document about using mysql as persistence. http://activemq.apache.org/jdbc-support.html.

      <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="poolPreparedStatements" value="true"/>
      </bean>

      Delete

Powered by Blogger.

About The Author

My Photo
Has been a senior software developer, project manager for 10+ years. Dedicate himself to Alcatel-Lucent and China Telecom for delivering software solutions.

Pages

Unordered List