Use this post as a reference for those who work with Apache ActiveMQ middle-ware.
You can download the needed ActiveMQ release from here.
Use this link to get through the installation, configuration and starting of ActiveMQ: Getting Started
You should have Java installed in your machine.I am using ActiveMQ 5.9.0.
Once you bring up the ActiveMQ instance, provided you left all the configurations as default,ActiveMQ will support TCP connections at port 61616. We are going to make use of this port to make our Producer and Consumer classes connect to ActiveMQ.
Disclaimer: ActiveMQ instance and Producer and Consumer programs run on the same machine. Therefore programs can use the URL tcp://localhost:61616 to connect to the ActiveMQ instance.
Producer Class is nothing but a java class which will connect to ActiveMQ and send a message to it specifying the queue to which the message should be en-queued.
Consumer Class is the one which connects to ActiveMQ to retrieve a message from a particular queue.
Lets get to the code now. I used Eclipse IDE to create this java project.
Producer Class(AMQProducer):
import java.sql.Timestamp; import java.util.Date; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class AMQProducer implements Runnable{ // Use the same factory for all the producer threads. static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); @Override public void run() { try { // Create a JMS connection from the ActiveMQ server Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // Create a session to send message Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // destination represents the message queue to which the message is en-queued Destination destination = session.createQueue("HelloMoto"); // MessageProducer is used to send messages // Refer http://docs.oracle.com/javaee/1.4/api/javax/jms/MessageProducer.html for more MessageProducer messageProducer = session.createProducer(destination); // Sets the producer's default delivery mode. messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Message defines the message header and the acknowledge method used for all JMS messages String text = "Hello Motorola from producer " + Thread.currentThread().hashCode() + "..." + new Timestamp((new Date()).getTime()); Message message = session.createTextMessage(text); messageProducer.send(message); System.out.println("Producer Thread("+Thread.currentThread().hashCode()+ ") : Sent \'" + text + "\'"); // Clean up messageProducer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") Exception occured."); } } }
Consumer Class(AMQConsumer.java):
import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class AMQConsumer implements Runnable, ExceptionListener { // Use the same factory for all the consumer threads. static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); @Override public void run() { try { // Create a JMS connection from the ActiveMQ server Connection connection = activeMQConnectionFactory.createConnection(); connection.setExceptionListener(this); // override "void onException(JMSException arg0)" method connection.start(); // Create a session to receive message Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("HelloMoto"); // MessageConsumer is used to receive messages // Refer http://docs.oracle.com/javaee/1.4/api/javax/jms/MessageConsumer.html for more MessageConsumer messageConsumer = session.createConsumer(destination); System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") : Waiting"); // receive(long timeout) - Receives the next message that arrives within the specified // timeout interval Message message = messageConsumer.receive(10000); // if the received message is text message, display it on console if (message instanceof TextMessage ) { TextMessage textMessage = (TextMessage)message; System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+ ") : Recieved \'" + textMessage.getText() + "\'"); } else { System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+ ") : Dont have any message to display"); } // Clean up messageConsumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") Exception occured."); } } // If a JMS provider detects a serious problem with a connection, it informs the connection's // ExceptionListener, @Override public void onException(JMSException arg0) { System.out.println("Consumer Thread("+Thread.currentThread().hashCode()+") JMS Exception occured. " + "Shutting down client."); } }
Main Class(AMQMain):
import org.sree.activemqeg.AMQConsumer; import org.sree.activemqeg.AMQProducer; public class AMQMain { public static void main(String[] args) throws InterruptedException { (new Thread(new AMQProducer())).start(); (new Thread(new AMQConsumer())).start(); (new Thread(new AMQConsumer())).start(); (new Thread(new AMQConsumer())).start(); Thread.sleep(1000); (new Thread(new AMQProducer())).start(); (new Thread(new AMQProducer())).start(); } }
Output:
Neglect the log4j warnings for the time being…
You can analyze the statistics of the various queues of ActiveMQ from a browser with the URL http://localhost:8161 (replace ‘localhost’ with the IP of the machine if ActiveMQ is in a different system). Have a look at the queue we created with our program.
Meet you in my next post “ActiveMQ Producer and Client in Java using Spring Framework” which will make things more configurable and professional. 😉