Developing JMS ApplicationsThis chapter will review how to develop some simple JMS applications. Managed ObjectsEstablishing a connection to the JMS provider is one of the first actions a JMS application takes. The way that the connection is established is through the used of a Managed Object. It important that you understand why using Managed Objects is beneficial to an application. The JMS API recommends that the connection factory and destination objects be accessed via JNDI. Since they are objects retrieved from JNDI, they are called Managed Objects. They can be managed outside of the application code. The JMS connection factory being used by a client application can be changed by an administrator. The JMS connection factory objects can be though of as objects that store the configuration information that will allow a client to establish a connection to the JMS provider. Typically it will store information such as the host name and port under which where the JMS service is running. The two main reasons to use JNDI to look up the connection factory and the destination objects are:
Doing a JNDI LookupThis section will show you how to do a JNDI look up of a managed object. The first thing you must do before doing a JNDI lookup is to create an InitialContext object. The easiest way to initialize the InitialContext object is to use the default constructor. When the default constructor is used, the JNDI API will look for a jndi.properties file in the CLASSPATH and use the information in it to connect to the correct JNDI server. The configuration of the jndi.properties file will depend on the JMS provider that you use. You can use following ?jndi.properties? file to configure JNDI to use the JBoss JNDI server on the localhost machine: java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces java.naming.provider.url=localhost Learning by example is one of the best ways to pick up on new technology. If you have never done a JNDI lookup before, just take a look at the following listings and you will see how easy it is.
JBossMQ connection factories implement both the QueueConnectionFactory and the TopicConnectionFactory interfaces. This is the reason that both Listing 3.3 and Listing 3.4 lookup the same JNDI location. JBossMQ, by default, binds all topics under the ?topic/? JNDI sub-context and all queues under the ?queue/? JNDI sub-context. An administrator can change the location of where the managed objects are bound under JNDI. For more information about how to configure the JNDI locations the objects are bound to, please see JBossMQ 3.2.1 ? The Core Administration Guide. Listing 3.3 Looking up a QueueConnectionFactory try { InitialContext ic; QueueConnectionFactory cf; ic = new InitialContext(); cf = (QueueConnectionFactory)ic.lookup(?ConnectionFactory?); } catch (NamingException e) { System.out.println("JNDI lookup failed: " + e.toString()); } Listing 3.4 Looking up a TopicConnectionFactory try { InitialContext ic; TopicConnectionFactory cf; ic = new InitialContext(); cf = (TopicConnectionFactory)ic.lookup(?ConnectionFactory?); } catch (NamingException e) { System.out.println("JNDI lookup failed: " + e.toString()); } Listing 3.5 Looking up a Queue and a Topic. try { InitialContext ic; Queue q1; Topic t1; ic = new InitialContext(); q1 = (Queue)ic.lookup(?queue/testQueue?); t1 = (Topic)ic.lookup(?topic/testTopic?); } catch (NamingException e) { System.out.println("JNDI lookup failed: " + e.toString()); } Creating a JMS ProducerThis section shows you how you can create a simple application that acts as a JMS message producer for the Point-to-Point and the Pub-Sub messaging models. The sample programs in this section do not try to implement proper error handling to keep the source code easy to follow. A Pub-Sub Message ProducerListing 3.6 provides you the sample program that will send a ?Hello World!? message to the ?testQueue?. After all of the local variables have been declared, you see the following lines of code:
ctx = new InitialContext();
cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
destination = (Queue)ctx.lookup("queue/testQueue");
These line of code lookup the managed objects that the application will be working with from JNDI. For more information on JNDI lookups, see chapter 3.4. The default JBoss configuration has the following queues deployed: testQueue, A, B, C, D, DLQ, and ex. You can use them for testing and development. Once the connection factory object is obtained, it will be used to create the connection to the JMS provider. After that, the connection can then be used to establish a JMS session:
connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
The first argument into the createQueueSession(...) method is weather or not the JMS session is going to be transacted. Our example passes in ?false? so that the session is not transacted. Transacted sessions will be covered in chapter 3.8. The second argument to the method sets up the acknowledgment mode. The acknowledgment mode only affects how messages are consumed so this setting does not affect our example. Acknowledgment modes are covered in chapter 3.6.3. Once the session has been created, it can be used to create the sender object and the message that the sender will be sending. In our example, the following code: sender = session.createSender(destination); message = session.createTextMessage(); message.setText("Hello World!"); creates the sender object and sets it up so that all messages that it sends go to the destination that was initially looked up via JNDI. The code also creates a TextMessage and sets the text of the message to ?Hello World!? At this point in the code all the objects that we need to send in the message have been initialized. Finally, the send method is to call on the sender object to send the message to the queue. sender.send(message); Once this method returns, the ?Hello World!? message will be sitting on the destination. One way to see if the message has made it to the destination is to use the JBoss jmx-console and view the MBean which manages the ?testQueue? destination. The MBean will be named jboss.mq.destination:name=testQueue,service=Queue. Once, you select the MBean, look for the QueueDepth MBean attribute. This attribute will tell you how many messages are sitting on the queue. Figure 8 MBean Attributes from a Queue MBean is displaying a queue that is holding two messages. Listing 3.6 Example: Sending a TextMessage to a Queue import javax.jms.*; import javax.naming.*; public class MessageToQueue { public static void main(String[] args) throws NamingException,JMSException { InitialContext ctx; QueueConnectionFactory cf; QueueConnection connection; QueueSession session; Queue destination; QueueSender sender; TextMessage message; ctx = new InitialContext(); cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Queue)ctx.lookup("queue/testQueue"); connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); sender = session.createSender(destination); message = session.createTextMessage(); message.setText("Hello World!"); System.out.println("Sending Message."); sender.send(message); connection.close(); System.out.println("Done."); } } A Pub-Sub Message ProducerThe previous example covered sending a ?Hello World!? message to queue. We will now cover sending the same message to a topic instead. If you compare the differences between sending a message to a Queue and sending a message to a Topic, you will notice the following differences:
Since there is direct mapping between the examples of the Point-to-Point case and the Pub-Sub case, these is no need to further explain the source code. The important thing to remember about this example is that the message is being sent to a Topic destination and therefore, the message will be delivered to all the consumers that are receiving messages from that destination. Listing 3.7 Example: Sending a TextMessage to a Topic import javax.jms.*; import javax.naming.*; /** * Sends a TextMessage to a Topic */ public class MessageToTopic { public static void main(String[] args) throws NamingException, JMSException { InitialContext ctx; TopicConnectionFactory cf; TopicConnection connection; TopicSession session; Topic destination; TopicPublisher publisher; TextMessage message; ctx = new InitialContext(); cf = (TopicConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Topic)ctx.lookup("topic/testTopic"); connection = cf.createTopicConnection(); session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); publisher = session.createPublisher(destination); message = session.createTextMessage(); message.setText("Hello World!"); System.out.println("Sending Message."); publisher.publish(message); connection.close(); System.out.println("Done."); } } Specifying the QoSThere are several factors that affect the QoS that a JMS provider will use while delivering a message to a consumer.
These factors can be configured on a per message basis when the messages are produced. The easiest way to set the QoS settings is to set the corresponding bean properties on the QueueSender or TopicPublisher object. The following example configures a publisher and a sender so that it sends persistent messages with a priority of 7 and a Time To Live value of one minute: QueueSender sender=... sender.setDeliveryMode(DeliveryMode.PERSISTENT); sender.setPriority(7); sender.setTimeToLive(60000); sender.send(message); .... TopicPublisher publisher=... publisher.setDeliveryMode(DeliveryMode.PERSISTENT); publisher.setPriority(7); publisher.setTimeToLive(60000); publisher.send(message); 4.3Creating a JMS Message ConsumerThis section will focus on how to create JMS message consumers. Many different options are available to a developer that is creating a message consumer. Consumers can:
Receiving a Message from a QueueWe will first describe how to create one of the simplest message consumers, a synchronous consumer that receives a text message from a queue. Listing 3.8 is the example program that will be reviewed. The first thing the application does is look up the managed objects using JNDI. More information about managed objects can be found in chapter 3.4.
ctx = new InitialContext();
cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory");
destination = (Queue)ctx.lookup("queue/testQueue");
Once the management objects have been looked up, we can establish our connection and session with the JMS provider. To keep it simple, the session that our example establishes is non-transactional and it auto acknowledges messages when they are received. A receiver is created to receive messages from the destination that was looked up via JNDI earlier.
connection = cf.createQueueConnection();
session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
receiver = session.createReceiver(destination);
Now that all of the JMS objects have been initialized, our application is ready to receive a message from the queue. The code below shows you how the connection is started and how the receiver is used to get the next message from the destination. connection.start(); message = (TextMessage)receiver.receive(); JMS Provider not Delivering Messages? One of the most common errors that is often made when creating a consumer is forgetting to start the connection. You must start the connection so that the JMS provider knows that it is OK to start delivering messages to the consumers. The need to start() the connection is becomes more apparent once we cover asynchronous message delivery. The receive() method call will block until a the JMS provider delivers a message to the receiver. Since it is a blocking method call, the JMS provider is doing a synchronous message delivery. Listing 3.8 The source code for a Point-to-Point consumer import javax.jms.*; import javax.naming.*; /** * Receives a TextMessage from a Queue */ public class QueueToMessage { public static void main(String[] args) throws NamingException, JMSException { InitialContext ctx; QueueConnectionFactory cf; QueueConnection connection; QueueSession session; Queue destination; QueueReceiver receiver; TextMessage message; ctx = new InitialContext(); cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Queue)ctx.lookup("queue/testQueue"); connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); receiver = session.createReceiver(destination); System.out.println("Waiting For A Message."); connection.start(); message = (TextMessage)receiver.receive(); System.out.println("The message was: "+message.getText()); connection.close(); System.out.println("Done."); } } Receiving a Message from a TopicNow we will examine the counter-part to the previous example, a consumer that receives a TextMessage from a Topic. This example is almost the same as the Queue example except that was converted so that it could operate in the Pub-Sub model. Only two changes were needed.
import javax.jms.*; import javax.naming.*; /** * Receives a TextMessage from a Topic */ public class TopicToMessage { public static void main(String[] args) throws NamingException, JMSException { InitialContext ctx; TopicConnectionFactory cf; TopicConnection connection; TopicSession session; Topic destination; TopicSubscriber subscriber; TextMessage message; ctx = new InitialContext(); cf = (TopicConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Topic)ctx.lookup("topic/testTopic"); connection = cf.createTopicConnection(); session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); subscriber = session.createSubscriber(destination); System.out.println("Waiting For A Message."); connection.start(); message = (TextMessage)subscriber.receive(); System.out.println("The message was: "+message.getText()); connection.close(); System.out.println("Done."); } } Acknowledgement ModesJMS consumers should be careful about the acknowledgment mode that they use. The acknowledgment mode is specified when the session is created. Any receivers created using the same session will have the same acknowledgment mode. The acknowledgment mode controls when the JMS provider considers that a message has been delivered successfully and when the message can be backed out so that it can be redelivered. In general, if a client application terminates before a message is acknowledged, the message is backed out and redelivered to the consumer. If your session is transacted the acknowledgment mode setting is ignored. For more information about how JMS transactions work see chapter 3.8. The examples that you have seen so far have used the AUTO_ACKNOWLEDGE acknowledgment mode. For simple JMS applications where there is not a critical need to ensure complete processing of all messages, this setting is sufficient.
Asynchronous Message DeliveryThe consumers that have been described so far have been synchronous receivers. They are considered synchronous because the client application must do a blocking synchronous receive(...) call to get the next message. JMS also allows you to deliver messages asynchronously to objects that implement the MessageListener interface. The MessageListener interface only contains one method:
public void onMessage(Message msg);
An application only needs to implement the MessageListener interface and JMS provider will send messages asynchronously to the onMessage(...) method. Receiving an Asynchronous Message from a QueueWe will now review an example of an asynchronous Point-to-Point message consumer. Listing 3.9 contains the full source listing for the example. This example will pick up a TextMessage from the ?testQueue? display it on the console and exit. In this example we create a new AsynchQueueToMessage instance and call the run() method on it. public static void main(String[] args) throws NamingException, JMSException { new AsynchQueueToMessage().run(); } The run() method now does the bulk of initializing the JMS objects. You will notice that that the JMS objects are initialized the similar to how it was done in Listing 3.8. The main difference is that the local variables have become instance variables. public void run() throws NamingException, JMSException { ctx = new InitialContext(); cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Queue)ctx.lookup("queue/testQueue"); connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); receiver = session.createReceiver(destination); Near the end of the run() method is where you will notice a difference from our previous MessageToQueue example. This example calls the setMessageListener() method and passes a new instance of a custom MessageListener that will be defined a little later.
receiver.setMessageListener(new MyMessageListener());
Once all the objects have been initialized and ready to receive messages, the connection's start() method is called so that the MessageListener can receive it's messages. Are you sure you want to start that connection?Once you start to create MessageListener objects that are not trivial like our example, it is important that you do not call the start() method before the message listener has been fully initialized. Otherwise, the MessageListener will receive messages before it is ready to process the messages delivered to it.
System.out.println("Waiting For A Message.");
connection.start();
}
The final part of the example that we will examine is the implementation of the MessageListener. The MessageListener is implemented as inner classes. It's onMessage(...) method displays the TextMessage to the console and then closes the connection. class MyMessageListener implements MessageListener { public void onMessage(Message msg) { try { TextMessage message = (TextMessage)msg; System.out.println("The message was: "+message.getText()); connection.close(); System.out.println("Done."); } catch (JMSException e) { e.printStackTrace(); } } } What is keeping the application running? An advanced Java programmer may notice that the main thread of execution returns normally after the connection.start(); executes. Normally a program would terminate after the main thread of execution returns. The reason the program does not terminate is because the connection starts a non-daemon thread in the background. The program will continue to run until the connection is closed. Listing 3.9 The source code for a Asynchronous Point-to-Point consumer import javax.jms.*; import javax.naming.*; /** * Listens for a TextMessage from a Queue */ public class AsynchQueueToMessage { InitialContext ctx; QueueConnectionFactory cf; QueueConnection connection; QueueSession session; Queue destination; QueueReceiver receiver; public static void main(String[] args) throws NamingException, JMSException { new AsynchQueueToMessage().run(); } public void run() throws NamingException, JMSException { ctx = new InitialContext(); cf = (QueueConnectionFactory)ctx.lookup("ConnectionFactory"); destination = (Queue)ctx.lookup("queue/testQueue"); connection = cf.createQueueConnection(); session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); receiver = session.createReceiver(destination); receiver.setMessageListener(new MyMessageListener()); System.out.println("Waiting For A Message."); connection.start(); } class MyMessageListener implements MessageListener { public void onMessage(Message msg) { try { TextMessage message = (TextMessage)msg; System.out.println("The message was: "+message.getText()); connection.close(); System.out.println("Done."); } catch (JMSException e) { e.printStackTrace(); } } } } SelectorsSelectors are used pick out a subset of messages that a consumer is interested in receiving. When you know a destination will have more messages than it is interested in consuming, the consuming application should use a selector. The selector works like similar to an SQL WHERE clause. The selector can use all all the message header and property fields to formulate it's selector. You specify the selector when you create the consumer. Once a consumer is created with a selector, it cannot be changed. An example of a a consumer being created with a selector can be found below.
// For the point-to-point case:
receiver = session.createReceiver(destination, "JMSType = 'order' AND total>100 AND tax <1");
// For the pub-sub case: subscriber = session.createSubscriber(destination, "JMSType = 'order' AND total>100 AND tax <1", false); The consumer would only receive messages of type 'order' who's total is over 100 and who's tax is less than one. For more in depth information on selector, see the Message class in the JMS javadocs. Durable SubscriptionsYou you already learned from chapter 3.3 that a topic does not hold messages like queues do. Topics allow you to broadcast a message to multiple receivers. So what do you do if you want to broadcast a message to multiple consumers but you want those messages to be held for the clients that are not currently connected? The answer is that the clients needs to establish a durable subscription with the topic. A client establishes a durable subscription by taking the following actions:
The following code snippet shows you how a durable subscription could be established:
connection = cf.createTopicConnection("john", "needle");
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber =
session.createDurableSubscriber(destination,"sub1");
Once a durable subscription has been established for the first time, the JMS provider will will retain messages for the subscriber even if the client is not currently connected. When the client reconnects with the same durable subscription, all retained messages will be delivered. Once a client now longer needs the JMS provider to retain the messages of a durable subscription, the client should cancel it's durable subscription. A client would accomplish this by using the Session.unsubscribe() method: session.unsubscribe("sub1"); If a client just wants the change an existing durable subscription to receive messages from a different topic and/or a different message selector, then the client does not need to un subscribe the subscription first. When a new durable subscription is established using an existing subscription id, the previous durable subscription is canceled first. JMS MessagesThe messages that producers and consumers pass to each other are very important since in essence the message is the interface between the two components. The JMS spec specifies that JMS providers must support the message types listed in Table 3.10. A JMS message is made up of three parts:
The type of JMS message that your application will use will depend on the type of message body that it will need to send. Use Table 3.10 to determine the type of JMS message you need to use. Table 3.10 The Message types that the JMS API defines.
JMS TransactionsA JMS transactions allows a client application to perform multiple JMS operations as an atomic unit of work. In other words, it allows you to rollback or commit a set of messages that were sent or received. Doing a rollback undoes any JMS operations that were previously done by the session. Doing a commit finalizes all the work that was done by the session. The JMS session object is what is used to control the JMS transaction. When you create a Session object, the first argument specifies if the session will be transacted. The code sample below creates a transacted JMS session. // for the point-to-point case: session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); // for the pub-sub case: session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); When you are working with a transacted JMS session, there are a few issues you need to be aware of:
The following code snippet is using a transacted session to publish two messages in a singe unit of work. If an exception occurs at anytime that the messages are being sent, the transaction is rolled back: try { publisher.publish(message1); publisher.publish(message2); session.commit(); } catch (Throwable e) { session.rollback(); } |
|
|||||||||||||||||||||||||||
© 2003 Core Developers Network Ltd "Core Developers Network", the stylized apple logo and "Core Associates" are trademarks of Core Developers Network Ltd. All other trademarks are held by their respective owners. Core Developers Network Ltd is not affiliated with any of the respective trademark owners. |