Home > Articles > Programming > Java

  • Print
  • + Share This
This chapter is from the book

Using the JMS Publish/Subscribe Model

Now we are going to look at an example of implementing the notification functionality using the Pub/Sub model. Although this example might seem like a stretch to require the Pub/Sub model, it will give you an idea of the differences between the two and what must be done differently for each model. Actually, there might even be a need for this design over the PTP model based on performing different tasks for different subscribers, so it might not be that much of a stretch.

As stated earlier, the Pub/Sub model in JMS involves using a topic as the destination rather than a queue. For this example, we are going to have two subscribers to the topic. One will be the handler that generates an e-mail message to the user and the second will be the subscriber that is responsible for logging the event for the administrator. Using separate subscribers allows us to modify the behavior that is taken for the admin notification separately from the user notification. We could possibly add a third subscriber where a page was sent to someone's pager. The point here is that using a Pub/Sub pattern allows you to specialize the behavior for each subscriber to the topic.

Creating the JMS Administered Objects

For this example, we will add a line to the auction_jms.properties file for the new destination. The resource file should look like the one in Listing 10.9 after adding the new line for the topic. We will not remove the lines from the Queue example from the previous sections. Listing 10.9 shows what the properties file should look after adding the new line.

Listing 10.9 The auction_jms.properties File with the Line for the Topic Added

AUCTION_CONNECTION_FACTORY=com.que.ejb20book.AuctionConnectionFactory
AUCTION_NOTIFICATION_QUEUE=com.que.ejb20book.EmailQueue
AUCTION_NOTIFICATION_TOPIC=com.que.ejb20book.AuctionNotificationTopic

To create the necessary administered objects for the topic, you will need to follow similar steps that you did when you created the queue for the last example. You don't have to create a new ConnectionFactory. We will reuse the one that you have already added.

Adding Subscribers to a Topic

We will support two subscribers for this example. One is the AuctionWinnerNotificationSubscriber and the other is the AuctionWinAdminNotificationSubscriber. Both classes extend an abstract super class called AuctionExternalNotificationSubscriber. For this example, we don't have a notification for a trailing bid as we did in the last chapter, but you could easily develop one by subclassing the abstract class as the other two do here. Listing 10.10 shows the abstract super class that both subscribers will extend. The only method is the onMessage method that is required by the MessageListener interface. The subscribers must implement this method to perform the functionality that is unique to that subscriber.

Listing 10.10 Source Code for AuctionExternalNotificationSubscriber.java

/**
 * Title:    AuctionExternalNotificationSubscriber
 * Description: This class is an abstract JMS Topic subscriber.
 */
package com.que.ejb20.notification;

import javax.jms.*;
import java.io.*;
import java.util.*;
import javax.naming.*;
import com.que.ejb20.services.email.*;

abstract public class AuctionExternalNotificationSubscriber
 implements Runnable, MessageListener {
  // The reference to the JNDI Context
  private InitialContext ctx = null;
  // Private static names for the Administered JMS Objects
  private static String connectionFactoryName = null;
  private static String topicName = null;

  private TopicConnectionFactory tcf = null;
  private TopicSubscriber subscriber = null;
  private TopicConnection topicConnection = null;
  private Topic topic = null;

 /**
  * Default Constructor
  */
 public AuctionExternalNotificationSubscriber() {
  super();
  loadProperties();
 }

 /**
  * This is the method that must be implemented from the MessageListener
  * interface. This method will be called when a message has arrived at the
  * Topic and the container calls this method and passes the Message.
  */
 abstract public void onMessage( Message msg );
 /**

Listing 10.10 Continued

  * The run method is necessary because this method implements the
  * Runnable interface to keep the thread alive and waiting for messages.
  * Otherwise, this would would not stay alive and would not be able to
  * listen for messages asyncronously.
  */
 public void run() {
  while( true ) {
   synchronized( this ){
    try{
     wait();
    }catch( InterruptedException ex ){
    }
   }
  }
 }
 // Private Accessors for Connection Factory Name
 private static String getConnectionFactoryName() {
  return connectionFactoryName;
 }

 // Private mutator for the Connection factory Name
 private static void setConnectionFactoryName( String name ) {
  connectionFactoryName = name;
 }

 // Private Accessors for the Topic Name
 private static String getTopicName() {
  return topicName;
 }

 // Private mutator for Topic Name
 private static void setTopicName( String name ) {
  topicName = name;
 }

 /**
  * This method is called to set up and initialize the necessary
  * Connection and Session references.
  */
 public void init( String msgSelector ) throws JMSException, NamingException {
  try{
   // Look up the jndi factory
    ctx = new InitialContext();

   // Get a connection to the QueueConnectionFactory
   tcf = (TopicConnectionFactory)ctx.lookup( getConnectionFactoryName() );

  // Create a connection
  topicConnection = tcf.createTopicConnection();

  // Create a session that is non-transacted and is notified automatically
  TopicSession ses =
   topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE );

  // Look up a destination

Listing 10.10 Continued

  topic = (Topic)ctx.lookup( getTopicName() );

  // Create the receiver with a msgSelector. The msgSelector may
  // be null. The noLocal parameter is set so that this subscriber
  // will not receive copies of its own messages
  subscriber = ses.createSubscriber( topic, msgSelector, true );

  // It's a good idea to always put a finally block so that the
  // context is closed
  }catch( NamingException ex ) {
   ex.printStackTrace();
   System.exit( -1 );
  }finally {
   try {
    // Close up the JNDI connection since we have found what we needed
    ctx.close();
   }catch ( Exception ex ) {
    ex.printStackTrace();
   }
  }

  // Inform the received that the callbacks should be sent to this instance
  subscriber.setMessageListener( this );

  // Start listening
  topicConnection.start();
  System.out.println( "Listening on topic " + topic.getTopicName() );
 }

 /**
  * This method is called to load the JMS resource properties
  */
 private void loadProperties() {
  String connectionFactoryName = null;
  String topicName = null;

  // Uses a Properties file to get the properties for the JMS objects
  Properties props = new Properties();
  try {
   props.load(getClass().getResourceAsStream( "/auction_jms.properties" ));
  }catch( IOException ex ){
   ex.printStackTrace();
  }catch( Exception ex ){
   System.out.println( "Had a problem locating auction_jms.properties");
   ex.printStackTrace();
  }

  connectionFactoryName = props.getProperty( "AUCTION_CONNECTION_FACTORY" );
  topicName = props.getProperty( "AUCTION_NOTIFICATION_TOPIC" );

  // Set the JMS Administered values for this instance
  setConnectionFactoryName( connectionFactoryName );
  setTopicName( topicName );
 }
 /**

Listing 10.10 Continued

  * For now, this method only prints out that an email is to be sent.
  * You'll see later how to do this using the JavaMail API.
  */
 private void sendEmail( NotificationEmail email ) {
  /* Delegate the actual sending of the email message to the
    horizontal email service */
  try{
   EmailService.sendEmail( email );
  }catch( EmailException ex ){
   ex.printStackTrace();
  }
 }
}

Listings 10.11 and 10.12 show the concrete subclasses that extend the abstract class in Listing 10.10. Each one is designed to perform specific business logic when a message arrives at the topic.

Listing 10.11 Source Code for AuctionWinnerNotificationSubscriber.java

/**
 * Title:    AuctionWinnerNotificationSubscriber<p>
 * Description: A Topic subscriber to handle a notification for a winner of
 *        an Auction.<p>
 */
package com.que.ejb20.notification;

import javax.jms.*;
import java.io.*;
import java.util.*;
import javax.naming.*;
import com.que.ejb20.services.email.*;

public class AuctionWinnerNotificationSubscriber
 extends AuctionExternalNotificationSubscriber {

 public AuctionWinnerNotificationSubscriber() {
 }
 /**
  * The onMessage method here generates an email through the horizontal
  * email service.
  */
 public void onMessage( Message msg ) {
  if ( msg instanceof ObjectMessage) {
   try {
    Object obj = ((ObjectMessage)msg).getObject();
    if ( obj instanceof AuctionNotification ) {
     sendEmail( (AuctionNotification)obj );
    }
   } catch( JMSException ex ) {
    ex.printStackTrace();
   }
  }

Listing 10.11 Continued

 }
 /**
  * Delegate the sending of the email to the horizontal service.
  */
 private void sendEmail( AuctionNotification msg ) {
  NotificationEmail email = new NotificationEmail();
  email.setToAddress( msg.getNotificationEmailAddress() );
  email.setBody( msg.toString() );
  email.setFromAddress( "AuctionSite" );
  email.setSubject( msg.getNotificationSubject() );
  // Delegate to the horizontal service
  EmailService.sendEmail( email );
 }
 /**
  * Main Method
  * This is the main entry point that starts the Email listening for
  * messages in the Topic.
  */
 public static void main( String args[]) {
  // Create an instance of the client
  AuctionWinnerNotificationSubscriber subscriber = null;

  try {
   subscriber = new AuctionWinnerNotificationSubscriber();
   subscriber.init( "NotificationType = 'AuctionWinner'" );
  }catch( NamingException ex ){
   ex.printStackTrace();
  }catch( JMSException ex ){
   ex.printStackTrace();
  }

  // Start the client running
  Thread newThread = new Thread( subscriber );
  newThread.start();
 }
}

Listing 10.12 Source Code for AuctionWinAdminNotificationSubscriber.java

/**
 * Title:    AuctionWinAdminNotificationSubscriber<p>
 * Description: A Topic subscriber to handle a notification for a winner of
 *        an Auction.<p>
 */
package com.que.ejb20.notification;

import javax.jms.*;
import java.io.*;
import java.util.*;
import javax.naming.*;
import com.que.ejb20.services.logging.*;

public class AuctionWinAdminNotificationSubscriber
 extends AuctionExternalNotificationSubscriber {

Listing 10.12 Continued

 /**
  * Default Constructor
  */
 public AuctionWinAdminNotificationSubscriber() {
  super();
 }

 /**
  * If the Message is an ObjectMessage and is an instance
  * of AuctionNotification, then log the message to the
  * horizontal logging service.
  */
 public void onMessage( Message msg ) {
  if ( msg instanceof ObjectMessage) {
   try {
    Object obj = ((ObjectMessage)msg).getObject();
    if ( obj instanceof AuctionNotification ) {
     try{
      String msgStr = ((AuctionNotification)obj).getNotificationSubject();
      ILogger logger = new Logger();
      logger.logMessage( new LogMessage( msgStr, LogMessage.INFO ));
      logger.close();
     }catch( LoggingException ex ){
      ex.printStackTrace();
     }
    }
   } catch( JMSException ex ) {
    ex.printStackTrace();
   }
  }
 }
 /**
  * Main Method
  * This is the main entry point that starts the Email listening for
  * messages in the Queue.
  */
 public static void main( String args[]) {
  // Create an instance of the client
  AuctionWinAdminNotificationSubscriber subscriber = null;

  try {
   subscriber = new AuctionWinAdminNotificationSubscriber();
   subscriber.init( "" );
  }catch( NamingException ex ){
   ex.printStackTrace();
  }catch( JMSException ex ){
   ex.printStackTrace();
  }

  // Start the client running
  Thread newThread = new Thread( subscriber );
  newThread.start();
 }
} 

Both of the classes in Listings 10.11 and 10.12 extend the AuctionExternalNotificationSubscriber class and override the abstract onMessage method. This is to allow for each subclass to do something special when a message arrives.

In the case of the AuctionWinnerNotificationSubscriber class in Listing 10.11, the sendMail method is called on the EmailService component that is part of the horizontal service. With Listing 10.12, the logMessage method is called on the LogService that is also a part of the horizontal services. The horizontal service class that handles logging appears in Listing 10.13. For now, the logger will only print out a message to the console. This component will be developed further in Chapter 21.

Listing 10.13 The Horizontal Component for Logging

/**
 * Title:    LogService<p>
 * Description: Horizontal Service Component for Logging<p>
*/
package com.que.ejb20.services.logging;

public class LogService {

 public static void logMessage( String msg ) {
  System.out.println( msg );
 }
} 

The logMessage method in the LogService class in Listing 10.13 is extremely basic and will be modified and further developed in Chapter 21. For now, we are just trying to provide stubs for the classes that need to use them.

Sending Messages to a Topic

To help test the Pub/Sub example, the AuctionWinnerPublisher class will be used. Just as with the Queue example from before, we are going to use a regular Java client to help us test the example. This message publisher code would normally reside in the EJB container and be triggered when one of the events occurred that needed to generate a notification, but to keep the example simple, we will just use this class for now.

Listing 10.14 shows the AuctionWinnerPublisher that will we use. The publisher will execute these general steps:

  1. Locate the necessary JMS administered objects.

  2. Create a new JMS message.

  3. Publish the message to the topic.

  4. Exit.

Listing 10.14 Source Code for AuctionWinnerPublisher.java

/**
 * Title:    AuctionWinnerPublisher<p>
 * Description: This class is used to test the AuctionNotificationTopic<p>
*/
package com.que.ejb20.notification;

import javax.jms.*;
import java.io.*;
import java.util.*;
import javax.naming.*;

/**
 * This class can be used to test sending an AuctionWinnerNotification
 * to the AuctionNotificationTopic. All of the subscribers will get a
 * copy of the JMS Message, which encapsulates an AuctionWinnerNotification
 * object with the details of the Auction win. Only one message will be sent
 * each time this class is executed.
 *
 * Usage: java AuctionWinnerPublisher
 */
public class AuctionWinnerPublisher {

  // The reference to the JNDI Context
  private InitialContext ctx = null;
  // Private static names for the Administered JMS Objects
  private static String connectionFactoryName = null;
  private static String topicName = null;
  // Private instance references
  private TopicConnectionFactory tcf = null;
  private TopicConnection topicConnection = null;
  private TopicSession ses = null;
  private Topic topic = null;

 /**
  * Default Constructor
  */
 public AuctionWinnerPublisher() {
  super();
  loadProperties();
 }

 public void publishWinnerNotification( AuctionWinnerNotification winMsg ) {
  // Local reference to a TopicPublisher
  TopicPublisher publisher = null;

  try{
   // Lookup the jndi factory
   ctx = new InitialContext();

   // Get a connection to the QueueConnectionFactory
   tcf = (TopicConnectionFactory)ctx.lookup( getConnectionFactoryName() );

   // Create a connection
   topicConnection = tcf.createTopicConnection();

Listing 10.14 Continued

   // Create a session that is non-transacted and is notified automatically
   ses =
    topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE );

   // Lookup a destination
   topic = (Topic)ctx.lookup( getTopicName() );

   // Create the publisher
   publisher = ses.createPublisher( topic );

   // Wrap the AuctionWinnerNotification inside of a JMS Message
   Message msg = ses.createObjectMessage( winMsg );

   // Set the property that will be used by the message selector
   msg.setStringProperty( "NotificationType", "AuctionWinner" );

   // Publish the message
   publisher.publish( msg );

   // Close the openresources
   topicConnection.close();

  }catch( NamingException ex ) {
   ex.printStackTrace();
   System.exit( -1 );
  }catch( JMSException ex ) {
   ex.printStackTrace();
  // It's a good idea to always put a finally block to ensure the
  // context is closed
  }finally {
   try {
    // Close up the JNDI connection since we have found what we needed
    ctx.close();
   }catch ( Exception ex ) {
    ex.printStackTrace();
   }
  }
 }

 // Private Accessors for Connection Factory Name
 private static String getConnectionFactoryName() {
  return connectionFactoryName;
 }

 // Private mutator for the Connection factory Name
 private static void setConnectionFactoryName( String name ) {
  connectionFactoryName = name;
 }

 // Private Accessors for the Topic Name
 private static String getTopicName() {
  return topicName;
 }

 // Private mutator for Topic Name

Listing 10.14 Continued

 private static void setTopicName( String name ) {
  topicName = name;
 }

 /**
  * This method is called to load the JMS resource properties
  */
 private void loadProperties() {
  String connectionFactoryName = null;
  String topicName = null;

  // Uses a Properties file to get the properties for the JMS objects
  Properties props = new Properties();
  try {
   props.load(getClass().getResourceAsStream( "/auction_jms.properties" ));
  }catch( IOException ex ){
   ex.printStackTrace();
  }catch( Exception ex ){
   System.out.println( "Had a problem locating auction_jms.properties");
   ex.printStackTrace();
  }

  connectionFactoryName = props.getProperty( "AUCTION_CONNECTION_FACTORY" );
  topicName = props.getProperty( "AUCTION_NOTIFICATION_TOPIC" );

  // Set the JMS Administered values for this instance
  setConnectionFactoryName( connectionFactoryName );
  setTopicName( topicName );
 }

 /**
  * Main Method. This is the entry point to test sending an
  * AuctionWinnerNotification to the Topic
  */
 public static void main( String args[]) {

  // Get the email address passed in on the command line
  if ( args.length == 0 ) {
   System.out.println( "Usage: AuctionWinnerPublisher <emailAddress>");
   System.exit( 0 );
  }

  String emailAddress = args[0];

  AuctionWinnerPublisher publisher = null;
  // Create an instance of this class
  publisher = new AuctionWinnerPublisher();
  // Load the properties from the jms bundle so that we can
  // locate the ConnectionFactory and the Topic

  // Create the Winner Notification
  AuctionWinnerNotification msg = new AuctionWinnerNotification();
  // Fill in some details for the Auction Win
  msg.setAuctionName( "Some Auction Item" );
  msg.setNotificationEmailAddress( emailAddress );

Listing 10.14 Continued

  // Obviously there is no Internationalization supported here. This is
  // just for testing purposes.
  msg.setAuctionWinPrice( "$75.00" );
  // Publish the message to the Topic
  publisher.publishWinnerNotification( msg );
 }
}

Running the Topic Example

Running the Topic example is not much different from the Queue example seen earlier in this chapter. To run the Topic example, you will need to follow these steps:

  1. Start the JMS service with the administered topic objects for this example.

  2. Run the AuctionWinnerNotificationSubscriber client.

  3. Run the AuctionWinAdminNotificationSubscriber client.

  4. Run the AuctionWinnerPublisher program and provide an e-mail address.

You will need to make sure that you have both the JNDI and JMS services up and running before you run either the subscriber or publisher programs. Both client programs need to have the JNDI and JMS JAR files included in the classpath.

To start either of the subscriber programs, just run them like any other Java program:

java com.que.ejb20.notification.AuctionWinnerNotificationSubscriber
or
java com.que.ejb20.notification.AuctionWinAdminNotificationSubscriber

The program will tell you that it's listening on the topic.

NOTE

Remember, there might be a difference between the JNDI name and the actual name for a destination. For example, the JNDI name given to the topic is com.que.ejb20book.AuctionNotificationTopic, but the actual property name you assign it when setting it up in the JMS administration properties might be different. Don't confuse the two.

To create a notification using the publisher, run the AuctionWinnerPublisher and pass in an e-mail address like the following:

java com.que.ejb20.notification.AuctionWinnerPublisher me@foo.com

The AuctionWinnerPublisher will not display any output before exiting. Both of the subscribers should print out a message on their consoles.

If you are having trouble running the example, see the "Troubleshooting" section at the end of this chapter for general JMS troubleshooting tips.

Durable Subscription

In terms of JMS, durability describes whether or not the JMS server will hold onto a JMS message if a subscriber is temporarily inactive. Message durability is different from message persistence. Durability is defined by the relationship that exists between a Topic subscriber and the JMS server. A subscriber that is set up as durable will have messages sent to it held by the server if the subscriber is temporarily distracted doing something else or its session becomes inactive for some reason. Durability can only be established for the Pub/Sub (Topic) message model.

Message persistence, on the other hand, is a relationship that is defined between a MessageProducer and the JMS server. Persistence can be established for both messaging models, as you'll see later in the "Message Persistence" section.

A cost overhead is involved with using durable subscribers. The subscriber registers the subscription with a unique identity that is retained by the JMS server. When a message arrives at the topic and one or more durable subscribers are inactive, the JMS server retains the messages for that subscription. When the subscriber becomes active again, the subscriber will receive the messages that are waiting for it. This is, of course, unless the message expires based on its time-to-live expiration date.

The Client ID

The client ID is a unique identifier that associates a JMS connection and the objects created through the connection with state that is maintained on behalf of a specific client. This is the means by which the JMS server knows how to deliver durable messages to a subscriber when it connects or becomes active.

You can define this ID in two ways:

  • Configure the TopicConnectionFactory with the client ID.

  • Set the client ID after acquiring a connection.

The JMS specification recommends that you set up a ConnectionFactory with a client-specific identifier and then the client looks up their specific ConnectionFactory when a connection is needed. Any connection obtained through this ConnectionFactory would be assigned this client ID.

The client can alternatively assign the ID after a connection is obtained from a ConnectionFactory. In this manner, a client-specific ConnectionFactory does not need to be configured.

CAUTION

Clients that use the alternative approach and use a default ConnectionFactory must remember to assign a unique client ID as soon as a connection is obtained from the factory. There is a chance that a unique ID already exists for a client. If this occurs, an exception will not be thrown and behavior is not predictable. Clients must be sure they have not already used a client ID for another connection. This is only if you are using durable subscriptions.

Although both message models use a client ID, only the Pub/Sub actually uses it. You will see a client ID for a QueueConnection, but JMS does not currently use them. Some vendors might be using client IDs for something internal to their JMS server for queues. Check with your vendor's documentation to be safe.

Creating Durable Subscribers

You can create durable topic subscribers by using one of the following two methods that exist on the TopicSession interface:

public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
public TopicSubscriber createDurableSubscriber(Topic topic, String name,
  String messageSelector, boolean noLocal)
throws JMSException

The name argument in the two method signatures is the unique client ID. You can also specify a messageSelector, which you saw in the section "Specify a Message Selector Query String" earlier in this chapter.

You can specify whether a client receives a copy of the messages it sends. This can happen because a message that is sent to a topic is distributed to all subscribers. If an application uses the same connection to both publish and subscribe to a topic, a client can receive the messages that it sends. To prevent this from happening, the client should set the noLocal argument above to true. The noLocal default is false, and therefore a subscriber can receive a copy of the messages that it sends.

Only one session can define a subscriber for a particular durable subscription at any given time. Multiple subscribers can access this subscription, but not at the same time.

Deleting Durable Subscribers

To delete a durable subscriber, you must use the following method on the TopicSession:

public void unsubscribe(String name) throws JMSException;

The name argument is the name of the durable subscriber that was used when the durable subscriber was created. You can't delete a durable subscriber if either of the following is true:

  • A TopicSubscriber is still active on the session

  • The subscriber is in the middle of a transacted message or has not acknowledged the incoming message

Modifying Durable Subscribers

To modify an existing durable subscription, you can optionally delete the existing durable subscriber and then re-create it using a new name. You also will get a new durable subscriber if you change either the messageSelector or noLocal values in the createDurableSubscriber method.

  • + Share This
  • 🔖 Save To Your Account