Pages

Sunday, November 30, 2008

JMS Tutorial : Topic Subscriber Client

This is in continuation of my tutorial on JMS. The below client uses topic to communicate with the ActiveMQ.

The jndi.properties

# START SNIPPET: jndi 

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic
topic.sample.data = sample.data

# END SNIPPET: jndi

/** 
* @author shreyas.purohit
*
*/
public class SampleJMSConsumer implements Runnable {

public void run() {
Context jndiContext = null;
TopicConnectionFactory connectionFactory = null;
TopicConnection connection = null;
TopicSession session = null;
TopicSubscriber consumer = null;
Topic destination = null;
String sourceName = null;
final int numMsgs;
sourceName = "sample.data";

/*
* Create a JNDI API InitialContext object
*/
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
e.printStackTrace();
System.exit(1);
}

/*
* Look up connection factory and destination.
*/
try {
connectionFactory = (TopicConnectionFactory) jndiContext
.lookup("topicConnectionFactry");
destination = (Topic)jndiContext.lookup(sourceName);
} catch (NamingException e) {
e.printStackTrace();
System.exit(1);
}

try {
connection = connectionFactory.createTopicConnection();
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
consumer = session.createSubscriber(destination);
connection.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
MessageListener listener = new MyTopicMessageListener();
consumer.setMessageListener(listener);
// Let the thread run for some time so that the Consumer has
// suffcient time to consume the message
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
}
}
}
}

}

The message listener:
/** 
* @author shreyas.purohit
*
*/
public class MyTopicMessageListener implements MessageListener {

/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message arg0) {
if(arg0 instanceof ObjectMessage){
try {
//Print it out
System.out.println("Recieved message in listener: " + ((ObjectMessage)arg0).getObject());
System.out.println("Co-Rel Id: " + ((ObjectMessage)arg0).getJMSCorrelationID());
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}else{
System.out.println("~~~~Error in format~~~");
}
}

}

The JMS Application:
/** 
* @author shreyas.purohit
*
*/
public class JMSApp {

/**
* @param args
*/
public static void main(String[] args) {
runInNewthread(new SampleJMSConsumer());
}
public static void runInNewthread(Runnable runnable) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(false);
brokerThread.start();
}
}

3 comments:

  1. Good Posting. I would like to one link that is also show on JMS tutorial.

    http://binodsuman.blogspot.com/2009/06/jms-easy-example-get-start-with-jms-jms.html

    Thanks,

    Binod Suman

    ReplyDelete
  2. What imports are required for this?

    Also, what's all on your build path and which version to JMS.jar are you pointing to, an old Sun one or something internal included in ActiveMQ?

    ReplyDelete
  3. I recently tried similar with AMQ 5.5 and using groovy. It works. Jars..Hmm.. ActiveMQ All for sure.. Other than that log4j, slf4j.. I guess, that should do.. Did not really observe as, most of the jars are present in AMQ distribution itself, that you can copy on exception, if any.

    ReplyDelete