Pages

Tuesday, June 24, 2008

ActiveMQ 5.1.0 tutorial

It is always good to know at least one message brokers. ActiveMQ is one of those top brokers used actively industry wide. So, here is a small tutorial or tips to use ActiveMQ along with java messaging service.

Installations

I am using the following configuration on my system:

1. Windows XP
2. JDK 5 update 15
3. ActiveMQ 5.1.0

Download activeMQ from http://activemq.apache.org/. Unzip to any suitable location. And the installation is done!!!


Directory Structure

After you unzip, there are few basic files you need to know.

a. The bin folder contains the batch file, activemq.bat, using which you can start the server. It also contains activemq-admin.bat, using which you can get more details about activemq, like a listing of queues etc.

b. The conf folder contains the activemq.xml configuring the ActiveMQ broker. Here is where we can configure the transports, permanent queues etc.


Creating Temporary Queues

Firt run bin/activemq.bat. This should start ActiveMQ listening at port 61616. There is an admin console(http://localhost:8161/admin) that can be used to monitor the ActiveMQ server. It can also be used to create queues, but these are temporary queues. That means, once the server is shutdown, and restarted, the queues will have to be re created.

ActiveMQ is primarily build for creating queues dynamically. But, it is possible to create queues permanently in ActiveMQ 4.1 or above.


Creating Permanent Queues

The conf/activemq.xml is called broker configuration. By adding the below in that creates a permanent queue, i.e on restart of the server, the queue does exist. You dont have to create it again.

<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<broker xmlns="http://activemq.apache.org/schema/core">
<destinations>
<queue physicalName="test.activemq.queue" />
</destinations>
</broker>
</beans>


Creating Dynamic Queues and configuration

There are two ways to create queues, and configure ActiveMQ dynamically.

a. Programatically, in java code.
b. Using JNDI

The preferred approach is JNDI. But, there is nothing wrong in knowing the first one either.

a. Programmatic usage in ActiveMQ

// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();

// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the Queue
Destination destination = session.createQueue("test.prog.queue");

b. Using JNDI

Here, jndi.properties is used to configure. This should be present in the CLASSPATH for the queues, connection factory to be configured correctly.
This is taken from the activeMQ website and modified.

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

# use the following property to specify the JNDI name the connection factory
# should appear as.
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.jndiqueue.test = test.prog.queue


# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic

Changing the ActiveMQ listening port

Copy and paste the conf/activemq.xml, and rename to newBroker.xml. change the port address in the XML:

<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:51616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:51617"/>
<transportConnector name="stomp" uri="stomp://localhost:51613"/>
<transportConnector name="xmpp" uri="xmpp://localhost:51222"/>
</transportConnectors>

Run the activeMQ batch file with command:

activemq xbean:newBroker.xml

Or place the newBroker.xml in some other directory, say, c:\configs\activemq\newBroker.xml.

Run:

activemq xbean:file:c:/configs/activemq/newBroker.xml

which will start the broker at new ports.

Points to remember:

a. Below exception indicates you are using backslashes in the file: at commandline. change to forward slashes, it will resolve the exception.

ERROR: java.net.URISyntaxException: Illegal character in opaque part at index 13
: xbean:file:E:\soft\activemq\apache-activemq-5.1.0\conf\newBroker.xml
java.net.URISyntaxException: Illegal character in opaque part at index 13: xbean
:file:E:\soft\activemq\apache-activemq-5.1.0\conf\newBroker.xml
at java.net.URI$Parser.fail(URI.java:2816)
at java.net.URI$Parser.checkChars(URI.java:2989)

b. Keep the port number in check, a very large value will also result in a out of range exception.

Running two brokers on same machine

Other than changing the port as shown above, a new datastore has to be given. This is configured in the below XML snippet in the newBroker.xml.

<persistenceAdapter>
<amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/newBrokerdata" maxFileLength="20 mb"/>
</persistenceAdapter>

Also, though not required, you can change the jetty servlet engine port of the newBroker.xml as below to say 8162.

<jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
<connectors>
<nioConnector port="8162"/>
</connectors>
<handlers>
<webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
<webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
<webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
</handlers>
</jetty>

Network of Brokers

We can have a network of brokers for load balancing and support even if one of the brokers fail due to any reason, say, network problem.

The network of broker can be established in two ways:

a. Statically listing the ips of the other broker.
b. using auto discovery mode.

Both the configuration uses the networkConnector element in the broker xml configuration.

a. For statically listing the ips, the below xml can be used in our newBroker.xml

<!-- The store and forward broker networks ActiveMQ will listen to -->
<networkConnectors>
<!-- by default just auto discover the other brokers -->
<!-- <networkConnector name="default-nc" uri="multicast://default"/> -->
<!-- Example of a static configuration:
<networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
-->
<networkConnector name="newBroker" uri="static://(tcp://localhost:61616)"/>
</networkConnectors>

If you observe the console, then the following line confirms that our brokers are running as a network of brokers.

INFO DemandForwardingBridge - Network connection between vm://localhost#0 and tcp://localhost/127.0.0.1:61616(localhost) has been established.

b. For auto discovrey mode, you can use the following XML.

<!-- The store and forward broker networks ActiveMQ will listen to -->
<networkConnectors>
<!-- by default just auto discover the other brokers -->
<networkConnector name="default-nc" uri="multicast://default"/>
<!-- Example of a static configuration:
<networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
-->
</networkConnectors>

Yes, you guessed right, this is the default configuration.

Remenber to have the below xml snipet with discoveryUri attribute in all your brokers. By default, this is present.

<transportConnector name="openwire" uri="tcp://localhost:51616" discoveryUri="multicast://default"/>

Client using failover over a network of broker

The failover protocol has to be used on the client side, so that, if any of the broker in the network of brokers fail, then the client can use an alternative broker which is up and running.

This is also just a configuration.

For our example, this will be in jndi.properties as:

java.naming.provider.url = failover:(tcp://localhost:61616,tcp://localhost:51616)

Or in code as:
String localURI = "tcp://localhost:61616";
String remoteURI = "tcp://localhost:51616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:("+localURI+","+"remoteURI)");

25 comments:

  1. Very nice post mate, good job. Thanks..

    ReplyDelete
  2. Anonymous2:24 AM

    Great Post.

    ReplyDelete
  3. Anonymous1:10 PM

    thanks very much, I was looking for a good ActiveMQ head start

    ReplyDelete
  4. Anonymous11:25 PM

    consumer stops getting message.it seems like it is dead but the server shows that the consumer exists. I think it becomes dead after sometime's inactivity.
    How can i enforce the my listener for reconnection to the server again?

    Can you help me? Thanks in advance and sorry if i am asking very simple question.....

    ReplyDelete
  5. May be one of the ways to solve the problem is use Spring JMS support. I have seen this happening, and because I use spring when ever the connection terminates, Spring re-connects while I try to send the next message. Listening, I am not too sure. Other than this I have to find out a solution. Let me see. Please post a solution if you find here. It will be very helpful.

    ReplyDelete
  6. Anonymous8:58 AM

    How to create MDB to listen to Queue in AMQ? Any Help

    ReplyDelete
  7. In Spring(If you are looking at EJB3, then sorry, I have not worked on that),

    Its very simple, Create a java bean with say a method "process". Then, you can configure the MDB in the context.xml as :
    (i am giving only snippet, you can find lot of resources in web for that)

    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${BROKER_URL}"/>
    </bean>
    </property>
    </bean>

    <bean id="jmsQueueService" class="com.ssb.JMSService"/>

    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="${QUEUE_NAME}"/>
    </bean>

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate" ref="jmsQueueService"/>
    <property name="defaultListenerMethod" value="process"/>
    </bean>

    <bean id="queueContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="messageListener" ref="queueListener"/>
    <property name="destination" ref="queue"/>
    </bean>

    ReplyDelete
  8. Anonymous12:25 AM

    I don't have words to express my happiness for these summary points you have provided .....

    Great Job Mate.

    Thanks heaps.

    Rahul

    ReplyDelete
  9. Anonymous11:48 AM

    really good introduction.

    ReplyDelete
  10. Anonymous10:56 PM

    sorry for this question, but where is jndi.properties file?

    ReplyDelete
  11. you can create that file yourself and copy the contents given above..

    ReplyDelete
  12. Great tutorial! Thanks for writing this . . .

    ReplyDelete
  13. Anonymous5:37 AM

    can we access web page using Apache camel

    ReplyDelete
  14. can we access web page using ActiveMQ

    ReplyDelete
  15. satyam, ActiveMQ is a middleware technology to introduce async behavior in your application. I did not understand when you say ActiveMQ accessing web page? Please explain your usecase.

    ReplyDelete
  16. Gisbert writes a half year ago also a good tutorial.
    You can find it here: http://www.developers-blog.org/blog/default/2008/10/28/A-simple-ActiveMQ-example

    ReplyDelete
  17. Anonymous2:43 AM

    Hi Sacrosanct,
    I have a situation where the message producer will be in a weblogic server where the listener will be on another standalsone machine. How can I achieve this using ActiveMQ?

    ReplyDelete
  18. ActiveMQ is a middleware that can be run on any accessible machine. Just point the URL to that machine while connecting to the ActiveMQ. The whole purpose of activeMQ is to support async messaging between multiple remote or local system.

    ReplyDelete
  19. Anonymous9:54 PM

    Thanks that answers the listener part, but while sending meesage using Active MQ, it has to be from weblogic. Can you throw some more light on how to integrate the weblogic server with Active MQ to send messages?

    ReplyDelete
  20. One way to do that would be programatically using
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://external address for activeMQ:61616"); Just include the needed jars in your webapp.

    ReplyDelete
  21. Thank you for the tip. We're having some problems with ActiveMQ right now.

    ReplyDelete
  22. Nice and easy tutorial, Please keep posting tutorials on some other topic like this one. nice work

    ReplyDelete
  23. This realy helped me. Thanks dude. I was looking to see how to create the queues.

    ReplyDelete
  24. Anonymous8:02 AM

    Excellent.

    ReplyDelete
  25. Anonymous11:30 AM

    Its really a good post!..Was b=very helpful!.. Thanks

    ReplyDelete