In this article, we will be creating a sample Spring JMS Solace Example that will consume messages from a Solace Messaging queue and topic. For easy setup of our application, we will be using spring boot and all the configurations will be java based following JMS specifications. We will be creating a sample JMS listener that will consume messages from the Solace Messaging queue.
In my previous articles we have already demonstrated multiple examples on JMS and discussed a lot about JMS. But the previous articles were integration with Apache ActiveMQ such as Spring Boot JMS Example with ActiveMQ. Here, we will have a similar implementation with Solace.
Project Structure
It&aposs a simple spring boot project structure. You can visit http://start.spring.io to generate a sample spring boot project.
Following will be the final project structure that we will be building here.
Maven Dependency
Following are the extra dependencies that we have added in this example to enable Solace messaging.
pom.xml<dependency> <groupId>com.solacesystems</groupId> <artifactId>sol-jms</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>com.solacesystems</groupId> <artifactId>sol-jcsmp</artifactId> <version>10.4.0</version> </dependency>
Spring Bean Configuration
Here, we will be defining our connection factory and connection related stuffs. We have configured our message listener and exception listener here.Once, the application is initialised, it will automatically start listening to the Solace queue. This will create 1 connection to the queue or topic.
BeanConfig.javapackage com.devglan.soringbootsolaceexample.config; import com.devglan.soringbootsolaceexample.listener.JmsExceptionListener; import com.devglan.soringbootsolaceexample.listener.JmsMessageListener; import com.solacesystems.jms.SolConnectionFactory; import com.solacesystems.jms.SolJmsUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.Queue; @Configuration @PropertySource({"classpath:application.properties"}) public class BeanConfig { private static final Logger logger = LoggerFactory.getLogger(BeanConfig.class); @Autowired private Environment environment; @Autowired private JmsExceptionListener exceptionListener; @Bean public SolConnectionFactory solConnectionFactory() throws Exception { SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); connectionFactory.setHost(environment.getProperty("solace.java.host")); connectionFactory.setVPN(environment.getProperty("solace.java.msgVpn")); connectionFactory.setUsername(environment.getProperty("solace.java.clientUsername")); connectionFactory.setPassword(environment.getProperty("solace.java.clientPassword")); connectionFactory.setClientID(environment.getProperty("solace.java.clientName")); return connectionFactory; } @Bean public JmsMessageListener jmsMessageListener() { return new JmsMessageListener(); } @Bean(destroyMethod = "close") public Connection connection() { Connection connection = null; javax.jms.Session session; try { connection = solConnectionFactory().createConnection(); session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(environment.getProperty("solace.message.consumer.queue")); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(jmsMessageListener()); connection.setExceptionListener(exceptionListener); connection.start(); logger.info("Connected. Awaiting message..."); } catch (Exception e) { logger.info("JMS connection failed with Solace." + e.getMessage()); e.printStackTrace(); } return connection; } }
Following is the application.properties where we have defined our connection parameters.
application.propertiessolace.message.consumer.queue=solacetestqueue solace.java.host=127.67.13.9:55555 solace.java.clientUsername=solaceusername solace.java.clientPassword=solacepassword solace.java.msgVpn=solaceVPN solace.java.clientName=solacetest solace.java.messageAckMode=client_ack solace.java.reapplySubscriptions=true
Solace Message Listener
Following is the message listener implementation. It assumes the message in the queue or topic to be in a JSON format. It converts the mesage in Customer object and passess to th service class. Th service class then process the data as per the business requirement.
The listener class implements MessageListener and overrides onMesage()
method.
package com.devglan.soringbootsolaceexample.listener; import com.devglan.soringbootsolaceexample.model.Customer; import com.devglan.soringbootsolaceexample.service.SolaceMessageService; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import java.io.IOException; public class JmsMessageListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(JmsMessageListener.class); @Autowired private SolaceMessageService messageService; @Override public void onMessage(Message message) { String messageData; Customer customer; if(message instanceof TextMessage) { TextMessage textMessage = (TextMessage)message; try { messageData = textMessage.getText(); logger.info(messageData); ObjectMapper objectMapper = new ObjectMapper(); try { customer = objectMapper.readValue(messageData, Customer.class); if(customer == null) { logger.error("Invalid message from the solace queue"); }else { logger.info("Successfully parsed solace message to object."); messageService.processSolaceMessage(customer); } } catch (IOException e) { logger.error("Error while parsing JSON from Solace."); e.printStackTrace(); } } catch (JMSException e) { e.printStackTrace(); } }else { logger.info(message.toString()); logger.info("Invalid message. Skipping ...."); } } }
Following is our sample customer class.
public class Customer { private String id; private String firstName; private String lastName; private String query; //setters and getters }
Message Exception Listener
To create an exception listener, we require to implement ExceptionListener from javax.jms package. For this example we aare simply logging the exception message.
package com.devglan.soringbootsolaceexample.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.jms.ExceptionListener; import javax.jms.JMSException; @Component public class JmsExceptionListener implements ExceptionListener { private static final Logger logger = LoggerFactory.getLogger(JmsExceptionListener.class); @Override public void onException(JMSException e) { logger.error(e.getLinkedException().getMessage()); e.printStackTrace(); } }
Running the Example
To start consuming messages mesage from the Solace queue, we can simply run our SoringBootSolaceExampleApplication.java
as a java application and we can see thatr our application starts consuming messages from the queue.
package com.devglan.soringbootsolaceexample; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SoringBootSolaceExampleApplication { public static void main(String[] args) { SpringApplication.run(SoringBootSolaceExampleApplication.class, args); } }
Conclusion
In this article, we created a sample spring boot application to consume messages from Solace Messaging queue and topic. We also defined our custrom exception listener. The source code for this implementation can be found at GitHub here.