ActiveMQ Producer and Consumer in Java

Use this post as a reference for those who work with Apache ActiveMQ middle-ware.

You can download the needed ActiveMQ release from here.

Use this link to get through the installation, configuration and starting of ActiveMQ: Getting Started

You should have Java installed in your machine.I am using ActiveMQ 5.9.0.

Once you bring up the ActiveMQ instance, provided you left all the configurations as default,ActiveMQ will support TCP connections at port 61616. We are going to make use of this port to make our Producer and Consumer classes connect to ActiveMQ.

Disclaimer: ActiveMQ instance and Producer and Consumer programs run on the same machine. Therefore programs can use the URL tcp://localhost:61616 to connect to the ActiveMQ instance.

Producer Class is nothing but a java class which will connect to ActiveMQ and send a message to it specifying the queue to which the message should be en-queued.

Consumer Class is the one which connects to ActiveMQ to retrieve a message from a particular queue.

Lets get to the code now. I used Eclipse IDE to create this java project.
Producer Class(AMQProducer):

import java.sql.Timestamp;
import java.util.Date;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AMQProducer implements Runnable{
    // Use the same factory for all the producer threads.
    static ActiveMQConnectionFactory activeMQConnectionFactory = 
            new ActiveMQConnectionFactory("tcp://localhost:61616");
    @Override
    public void run() {
        try {
            // Create a JMS connection from the ActiveMQ server
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();

            // Create a session to send message
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // destination represents the message queue to which the message is en-queued
            Destination destination = session.createQueue("HelloMoto");

            // MessageProducer is used to send messages
            // Refer http://docs.oracle.com/javaee/1.4/api/javax/jms/MessageProducer.html for more
            MessageProducer messageProducer = session.createProducer(destination);

            // Sets the producer's default delivery mode. 
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Message defines the message header and the acknowledge method used for all JMS messages
            String text = "Hello Motorola from producer " + Thread.currentThread().hashCode() + "..."
            + new Timestamp((new Date()).getTime());
            Message message = session.createTextMessage(text);

            messageProducer.send(message);

            System.out.println("Producer Thread("+Thread.currentThread().hashCode()+
                    ") : Sent \'" + text + "\'");

            // Clean up 
            messageProducer.close();
            session.close();    
            connection.close();

        }
        catch (Exception e) {
            System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") Exception occured.");
        }
    }    
}

Consumer Class(AMQConsumer.java):

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
public class AMQConsumer implements Runnable, ExceptionListener  {
	// Use the same factory for all the consumer threads.
	static ActiveMQConnectionFactory activeMQConnectionFactory = 
			new ActiveMQConnectionFactory("tcp://localhost:61616");

	@Override
	public void run() {
		try {
			// Create a JMS connection from the ActiveMQ server
			Connection connection = activeMQConnectionFactory.createConnection();
			connection.setExceptionListener(this); // override "void onException(JMSException arg0)" method
			connection.start();

			// Create a session to receive message
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			Destination destination = session.createQueue("HelloMoto");

			// MessageConsumer is used to receive messages
			// Refer http://docs.oracle.com/javaee/1.4/api/javax/jms/MessageConsumer.html for more
			MessageConsumer messageConsumer = session.createConsumer(destination);

			System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") : Waiting");

			// receive(long timeout) - Receives the next message that arrives within the specified
			// timeout interval
			Message message = messageConsumer.receive(10000);

			// if the received message is text message, display it on console
			if (message instanceof TextMessage ) {
				TextMessage textMessage = (TextMessage)message;
				System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+ 
						") : Recieved \'" + textMessage.getText() + "\'");
			} else {
				System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+
						") : Dont have any message to display");
			}

			// Clean up
			messageConsumer.close();
			session.close();
			connection.close();

		} catch (Exception e) {
			System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") Exception occured.");
		}		
	}

	// If a JMS provider detects a serious problem with a connection, it informs the connection's
	// ExceptionListener,
	@Override
	public void onException(JMSException arg0) {
		System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") JMS Exception occured.  "
				+ "Shutting down client.");
	}
}

Main Class(AMQMain):

import org.sree.activemqeg.AMQConsumer;
import org.sree.activemqeg.AMQProducer;

public class AMQMain {

	public static void main(String[] args) throws InterruptedException {
		(new Thread(new AMQProducer())).start();
		(new Thread(new AMQConsumer())).start();
		(new Thread(new AMQConsumer())).start();
		(new Thread(new AMQConsumer())).start();
		Thread.sleep(1000);
		(new Thread(new AMQProducer())).start();
		(new Thread(new AMQProducer())).start();
	}
}

Output:

Apache ActiveMQ Java Producer Client OutputNeglect the log4j warnings for the time being…

You can analyze the statistics of the various queues of ActiveMQ from a browser with the URL http://localhost:8161 (replace ‘localhost’ with the IP of the machine if ActiveMQ is in a different system). Have a look at the queue we created with our program.

ActiveMQ queue screenshot

Meet you in my next post “ActiveMQ Producer and Client in Java using Spring Framework” which will make things more configurable and professional. 😉