Home > Articles > Programming > Windows Programming

Integrating Messaging in VB.NET

  • Print
  • + Share This
  • 💬 Discuss
From the author of
One effective way to increase the scalability and reliability of a distributed application is to move from a model where application requests are processed synchronously to one where some or all of the requests are processed asynchronously. Microsoft includes the Microsoft Message Queue (MSMQ) product in their server operating systems to provide the queueing infrastructure for applications to create and manipulate queues in addition to sending to and receiving messages from those queues. The Services Framework provides a namespace, System.Messaging, that encapsulates the functionality of MSMQ. This chapter examines the System.Messaging namespace – first in how queues are programmatically referenced and administered, and secondly in how messages are serialized, sent, and received by application programs.
This article is excerpted from Chapter 13, "Integrating with the Enterprise," from Building Distributed Applications with VB.NET.

This article assumes the reader has some experience with VB, the Windows environment, event-based programming, basic HTML, and scripting. This material is based on the Beta2 release version of Microsoft's .NET technology.

One of the particularly effective ways to increase the scalability and reliability of a distributed application is to move from a model where application requests are processed synchronously to one where some or all of the requests are processed asynchronously. As discussed in Chapter 8, Microsoft includes the Microsoft Message Queue (MSMQ) product as a service in their server operating systems to provide the queueing infrastructure for applications to create and manipulate queues in addition to sending to and receiving messages from those queues.

TIP

MSMQ is not installed by default in Windows 2000 Server. You can do so by using the "Configure Your Server" utility found in the Administrative Tools group and looking under the Advanced option.

Not surprisingly, the Services Framework provides a namespace, System.Messaging, that encapsulates the functionality of MSMQ. This section will examine the System.Messaging namespace, first in how queues are programmatically referenced and administered and secondly in how messages are serialized, sent, and received by application programs.

Administering Queues

Although the System.Messaging namespace contains over 20 classes, the central class is MessageQueue. This class contains both shared and instance methods to allow you to query for queues contained on a particular machine or across the network in addition to manipulating individual queues.

At the most basic level, the set of shared members include Create, Delete, Exists, and several methods prefixed by Get that allow you to query for queues. For example, the following code uses the Exists method to determine if a queue identified by the mstrPath variable exists and if not creates it. In either case the queue is then referenced by creating an instance of MessageQueue and passing in the identifier of the queue:

If Not MessageQueue.Exists(mstrPath) Then
 MessageQueue.Create(mstrPath, False)
End If

Dim oQueue As MessageQueue
oQueue = New MessageQueue(mstrPath, False)

Note that in the Create method, the second argument indicates whether the queue should be transactional, in other words use the Microsoft Distributed Transaction Coordinator (MSDTC) service to ensure message delivery. And, in the constructor of the MessageQueue object, the second argument specifies whether the first application to access the queue receives exclusive access to it. Note that creating a new MessageQueue object using the New keyword does not create a new queue, it simply references one that already exists.

You'll notice that the path passed into these methods is simply a string that identifies the queue. In fact, the string can take one of three forms:

  • The path to the queue as returned by the Path property of the MessageQueue object. This is the typical approach in the form MachineName\QueueName. For private queues, it is MachineName\private$\QueueName. System queues such as Deadletter$ and Journal$ can also be accessed this way.

  • The format name returned by the FormatName property prefixed with "FormatName:". This is typically used for offline access to queues.

  • The label of the queue as returned by the Label property prefixed with "Label:". The Label property can be set for a queue to provide a description. Using the Label is not recommended since labels are not required to be unique and can thus cause exceptions to be thrown when sending messages or referencing a queue.

In addition to creating and deleting queues, the MessageQueue class also provides query and enumeration methods to list the queues on a machine and the message within the queue. To illustrate these capabilities, review the SetupQueues method shown in Listing 13.10.

Listing 13.10. Manipulating Queues. This method manipulates the private queues on the given machine.

Imports System.Messaging

Public Sub SetupQueues(Byval pMachine As String)
  Dim oQueue As MessageQueue
  Dim arQueues() As MessageQueue

  ' Enable the connection cache
  MessageQueue.EnableConnectionCache = True

 Try
  ' List the private queues on the machine
  arQueues = MessageQueue.GetPrivateQueuesByMachine(pMachine)
  For Each oQueue In arQueues
   If Right(oQueue.Path, 1) <> "$" Then
    With oQueue
     .MaximumQueueSize = 2048
     .MaximumJournalSize = 4192
     .UseJournalQueue = True
     .EncryptionRequired = EncryptionRequired.None
    End With

    ' Purge the journals
    Dim oJournal As MessageQueue
    If MessageQueue.Exists(oQueue.Path & "\Journal$") Then
     oJournal = New MessageQueue(oQueue.Path & "\Journal$")
     oJournal.Purge()
    End If

    ' Delete acknowledgement messages
    Dim enMessages As MessageEnumerator = oQueue.GetEnumerator()
    While (enMessages.MoveNext())
     If enMessages.Current.MessageType = MessageType.Acknowledgment Then
      enMessages.RemoveCurrent()
     End If
    End While
  Next
 Catch e As MessageQueueException
   ' Log a message
 End Try

End Sub

First, you'll notice that the SetupQueues method uses the EnableConnectionCache shared property to enable the MessageQueue class to reuse read and write handles to queues, thereby increasing performance. You can clear the cache using the ClearConnection cache shared method.

Next, the GetPrivateQueuesByMachine method is invoked with the name of the computer on which to retrieve the queues. The result is an array of MessageQueue objects stored in the arQueues object. The array can then be iterated over. Note that the Path property is parsed to determine whether the queue is a system queue (denoted with a $ at the end). If not, various properties that put storage limits on the queue ensure that copies of all messages are saved to a journal queue, and that encryption is not required are set. These changes take effect immediately and will throw exceptions if the code does not have permissions.

NOTE

Each queue has a set of permissions, for example, "Set Properties," and "Get Properties" permissions that can be set in the MSMQ UI by right-clicking on the queue and selecting Properties. MSMQ can be administered in the Computer Management MMC console under Services and Applications. Permissions can also be programmatically manipulated using the SetPermissions and ResetPermissions methods of the MessageQueue class.

Using the Path of the current queue, the journal queue, if it exists, is then referenced using its path name by the oJournal object. The entire collection of messages in the queue is then deleted using the Purge method.

Finally, the messages in a particular queue are traversed using a MessageEnumerator object. Simply put, the MessageEnumerator exposes a forward-only cursor of messages populated by one of several Get methods in the MessageQueue class. In this case, the GetEnumerator instance method simply returns an enumerator that traverses all the messages in the queue. The MessageEnumerator itself returns the current message (Message object) in the Current property and exposes methods such as MoveNext and RemoveCurrent to manipulate the list. In this case, the MessageType of each message is checked; if it is an acknowledgement message, it is deleted.

NOTE

Acknowledgement messages are special types of messages where the body of the message is empty. Acknowledgements can be automatically sent when the message reaches its destination queue or is successfully retrieved from the queue. MSMQ can send both positive and negative acknowledgement, for example in order to send a message if the message is not retrieved in a set amount of time. To enable acknowledgement, you must tell the queue you wish to send acknowledgment to in the AdministrationQueue property of the Message object and then set the AcknowledgeType property to one of the AcknowledgeTypes enumeration constants. You then check the AdministrationQueue as you would any other queue. We will discuss this later in the chapter.

The interesting aspect of the enumerator is that it is dynamic. In other words, if new messages with a lower priority than the Current message are added to the queue they will be included in the cursor. To retrieve a static list of messages, you can use the GetAllMessages method.

Not only does the MessageQueue class support querying for private queues, it can also query for publicly available queues. The GetPublicQueues, GetPublicQueuesByCategory, GetPublicQueuesByLabel, and the GetPublicQueuesByMachine all return an array of MessageQueue objects. However, the first method in this list is also overloaded to accept a MessageQueueCriteria object in which you can specify multiple criteria like those exposed by the other methods (Category, Label, and MachineName) in addition to when the queue was created (CreatedAfter, CreatedBefore) and last modified (ModifiedAfter, ModifiedBefore).

Alternatively, rather than return a static array of queues, the GetMessageQueueEnumerator can be used to create a dynamic cursor that can query the public queues based on criteria specified in a MessageQueueCriteria object. For example, the following code queries the network for queues that match the given category and adds the Path and CreateTime properties to a list to display to the user:

Dim enQueues As MessageQueueEnumerator
Dim crMessage As MessageQueueCriteria

crMessage.Category = New Guid("00000000-0000-0000-0000-000000000002")
enQueued = MessageQueue.GetMessageQueueEnumerator(crMessage)

While enQueues.MoveNext
  AddToList(enQueues.Current.Path(), enQueue.Current.CreateTime)
End While

Note that the Category is simply a Guid that needn't be unique. As the name implies, you can use this property to categorize your queues.

Installing Queues

As shown in the previous discussion, the MessageQueue class can be used to create and delete queues programmatically. However, as with event logs, performance counters, and other system resources, the recommended technique is to install the resource if needed along with the application that uses it. Not surprisingly the System.Messaging namespace contains the MessageQueueInstaller class to do just that.

The MessageQueueInstaller class works like the installer classes discussed in Chapter 12. In order to use it, you first need to derive a class from Installer that will be run by the Installutil.exe utility when its RunInstaller attribute is set to True. Next you can declare a variable to hold an instance of the MessageQueueInstaller class and use it in the constructor of the derived Installer class to specify how the queue will be installed. For example, the following code would be contained in the New method of the derived Installer class:

Imports System.Configuration.Install
Imports System.Messaging
 
mMSMQInstaller = New MessageQueueInstaller()

With mMSMQInstaller
  .Label = "QuilgoyDocQueue"
  .UninstallAction = UninstallAction.Remove
  .UseJournalQueue = True
  .Transactional = True
  .Path = ".\Private$\QuilogyDocs"
End With

Installers().Add(mMSMQInstaller)

In this example, the variable mMSMQInstaller is instantiated and its properties are set to those required for this queue. Note that only the Path property is required and here uses the "." to indicate that the queue will be installed on the local machine. In addition, the UninstallAction property is used to make sure the queue is removed from the system when this application is uninstalled. As with other installers, you must add it to the Installers collection using the Add method as shown here.

TIP

Rather than setting all of the properties manually as is done in this case, you also have the option of passing a MessageQueue object to the overloaded constructor of MessageQueueInstaller. Doing so copies the queue properties to the newly created queue.

Sending and Receiving Messages

Once a queue has been referenced, applications use the MessageQueue object to place (send) messages on the queue and read (receive) messages from the queue.

Sending (Placing) Messages

To send (place) a message, all you need do is call the overloaded Send instance method, passing it the object to place on the queue. The object is then automatically serialized to either XML or a binary representation and placed in the Body of the Message object. How the serialization takes place is determined by the Formatter property of the either the MessageQueue object or the Formatter property of the Message being sent to the queue.

The System.Messaging namespace supports three types of formatters: XmlMessageFormatter (the default), ActiveXMessageFormatter, and BinaryMessageFormatter. By default, an instance of XmlMessageFormatter is created with the MessageQueue object and is assigned to the Formatter property. It is then used to serialize the message to a Stream and place it in the Body property of the Message object when the message is sent and again to deserialize it when the message is read from the queue. The ActiveXMessageFormatter can be used to serialize COM objects and allows interoperability with VB 6.0, while the BinaryMessageFormatter can be used to serialize a more compact (and complete) binary representation of the type.

By using this approach, the Send method can simply accept the System.Object data type and, as long as the object can be serialized to a binary format (by marking the class with the Serializable attribute or to XML), it can be saved in the message body. For example, an instance of the QStudents class shown in Listing 13.10 can be sent to queue like so:

Dim o As New QStudents()

o.AddStudent("Sammy", "Sossa", 1233, "Cubs")
o.AddStudent("Kerry", "Wood", 232, "Cubs")

Dim oQueue As New MessageQueue(".\private$\Registrations")
oQueue.Formatter = New BinaryMessageFormatter()
oQueue.Send(o)

In this case, the default XmlMessageFormatter is replaced with a BinaryMessageFormatter. In a similar fashion, by omitting the explicit population of the Formatter property, the QStudents object will be serialized to XML. The message body as viewed from the MSMQ snap-in can be seen in Figure 13.1.

Figure 13.1 A Serialized Message. This dialog shows the serialized MSMQ message in XML format.

NOTE

Note that the QStudents, Student, and Name classes each must have the Serializable attribute set, which is not shown in Listing 13.10.

Although using the automatic serialization provided by the formatters is the easiest way to place objects into a message, you can also directly populate the Body or BodyStream properties of the Message object itself. This is useful when you are placing data from files or other sources into the queue. To illustrate how this works, review the ProcessDocs method shown in Listing 13.11.

Listing 13.11. Writing to a Message. This method opens files and writes their contents directly to the Body of a Message object using the BodyStream property.

Imports System.IO
Imports System.Messaging

 Public Sub ProcessDocs(ByVal pPath As String, ByVal pQueue As String)

  ' Loop through all docs in a directory
  Dim oFile As FileInfo
  Dim strFile As String
  Dim oQueue As MessageQueue
  Dim fs As FileStream

  Try
   ' Ensure queue exists
   If Not MessageQueue.Exists(pQueue) Then
    Throw New ArgumentException("Queue " & pQueue & " does not exist.")
    Return
   End If

   ' Reference the queue
   oQueue = New MessageQueue(pQueue)

   'Go get each file
   For Each strFile In Directory.GetFileSystemEntries(pPath, "*.xml")
    ' Open the file
    oFile = New FileInfo(strFile)
    fs = oFile.OpenRead

    ' Send the contents to the queue
    Dim oMessage As New Message()
    With oMessage
      .BodyStream = fs
      .Label = oFile.FullName
      .UseDeadLetterQueue = True
      .TimeToBeReceived = New TimeSpan(1, 0, 0, 0)
      .Priority = MessagePriority.Normal
    End With
    oQueue.Send(oMessage)
    fs.Close()
   Next

  Catch e As MessageQueueException
   ' Log the fact that an error occurred
  Catch e As Exception
   ' Log the fact that an error occurred
  End Try

 End Sub

You'll notice in Listing 13.11 that the method accepts a file path and the path to a queue. After determining that the queue is available and referencing it as oQueue, the directory is traversed for files with the .xml extension. As each file is encountered, it is opened for reading using the OpenRead method of the FileInfo object. This method returns a FileStream that can then be placed directly into the BodyStream property of the Message object. In addition, this method sets some of the properties of the Message, including the Priority and Label, the latter of which can be used as a description and a property to query on.

NOTE

The Message object also supports the AppSpecific and Extension properties that can be used to store application-specific data along with the message. A typical use for these properties is the storage of properties that describe the Body of the message but that are separate from it.

The UseDeadLetterQueue and TimeToBeReceived properties work together to ensure that if the message is not read by another application before the time elapses as specified by the TimeSpan object (in this case 1 day), the message will be sent to the dead letter queue (MachineName\Deadletter$).

When the Send method is invoked, the Stream (in this case the FileStream) is read and the Body property of the Message is populated.

Receiving (Reading) Messages

The MessageQueue class also supports several methods for receiving (reading) messages from a queue. These methods fall into two categories: "peek" methods and "receive" methods.

The peek methods include Peek, PeekByCorrelationId, PeekById, BeginPeek, and EndPeek. In the first three cases, the method returns a Message object from the queue without removing it. In this way, an application can read a message before determining whether it needs to be processed. While Peek returns the first message in the queue, PeekByCorrelationId and PeekById search the queue to find the first message whose CorrelationId and Id properties match the given criteria, respectively.

NOTE

The CorrelationId property is used by acknowledgement, response, and report messages to reference the original message. In other words, it allows you to link an originating message with messages created in response.

All three methods are synchronous, and so they block the current thread until a message is received. In order to avoid blocking indefinitely, they are also overloaded to accept a TimeSpan argument that releases the thread and throws a MessageQueueException when the time expires. The BeginPeek and EndPeek methods allow asynchronous access to the first message in the queue so that the current thread is not blocked.

The collection of "receive" methods includes analogous Receive, ReceiveByCorrelationId, ReceiveById, BeginReceive, and EndReceive. As you might imagine, the first three in this list behave analogously to the peek methods but have the effect of removing the message from the queue once it is read. The latter two methods are used to read messages asynchronously.

While reading simple messages synchronously is fairly straightforward, when receiving serialized objects as messages you must be aware of the type you wish to deserialize to. For example, in an earlier code snippet, an instance of QStudents was serialized to a Message using the BinaryMessageFormatter. To read this message, your code needs to set the Formatter property appropriately and then cast the Body of the Message to the appropriate type like so:

Dim oQueue As New MessageQueue(".\private$\Registrations")
Dim oNew As QStudents

oQueue.Formatter = New BinaryMessageFormatter()
oNew = CType(oQueue.Receive.Body, QStudents)

The oNew variable now contains the deserialized QStudents object.

TIP

To improve performance, you can set properties of the MessagePropertyFilter object exposed in the MessagePropertyFilter property of the MessageQueue instance you are reading from. Each property represents one of the message properties and can be set to True or False to specify whether the property is returned. By default only nine of the approximately 40 properties are returned.

While the previous code snippet used the BinaryMessageFormatter to serialize the object, using the XmlMessageFormatter is more flexible. The reason is that when using the BinaryMessageFormatter, the object is serialized using the type definition (including the version) of the class. This implies that when the object is deserialized, it must be cast to exactly the same type. In other words, the QStudents class must be publicly available in the Global Assembly Cache (GAC) or included as a private assembly and referenced by the receiving application.

By using the XmlMessageFormatter, the receiving application can create its own proxy class to handle the deserialized object. This class can be manually created or generated from XSD. However, when using this approach, the TargetTypes or TargetTypeNames property must be populated before receiving the message. This is required so that the XmlMessageFormatter knows into which object to deserialize the message body. For example, if the QStudents object was serialized to XML, as shown in Figure 13.1, it could be deserialized using the code

Dim oNew As QStudentsNew

oQueue.Formatter = New XmlMessageFormatter(New String() {"QStudentsNew"})
oNew = CType(oQueue.Receive.Body, QStudentsNew)

where QStudentsNew is a new class built from the same schema. In this case, the TargetTypeNames property (an array of Strings) is populated in the constructor. The overloaded constructor can also accept an array of Type objects.

One of the subtle advantages to serializing objects directly to the queue is that the objects can encapsulate required behavior. For example, the QStudents class can expose a SaveToDatabase method that knows how to persist the contents of the object to SQL Server using the System.Data.SqlClient namespace. This makes working with the object simple for the receiving application, which can simply deserialize the object and invoke the SaveToDatabase method. In this way, the receiving application needn't understand the internal structure of the QStudents object.

However, in other scenarios, you may wish to parse the message yourself, for example if it contains an XML document. To that end, just as the BodyStream property of the Message object can be populated with a Stream, it can also be used to access the Body of the message. For example, the RetrieveDocs method shown in Listing 13.12 drains the given queue of its messages and passes the BodyStream of the Message to the ExtractStudents method.

Listing 13.12. Draining the queue. This method drains the given queue by calling the Receive method and then passes the BodyStream to a method that processes it.

Imports System.Messaging

 Public Sub RetrieveDocs(ByVal pQueue As String)
  Dim oQueue As MessageQueue
  Dim oMessage As Message
  Dim flDone As Boolean = False

  Try
   ' Ensure queue exists
   If Not MessageQueue.Exists(pQueue) Then
    Throw New ArgumentException("Queue " & pQueue & " does not exist.")
    Return
   End If

   ' Reference the queue
   oQueue = New MessageQueue(pQueue)

   ' Drain the queue
   While Not flDone
    Try
      oMessage = oQueue.Receive(New TimeSpan(0, 0, 5))
      Call ExtractStudents(oMessage.BodyStream)
    Catch e As MessageQueueException
      flDone = True
    End Try
   End While

  Catch e As MessageQueueException
   ' Log the fact that an error occurred
  Catch e As Exception
   ' Log the fact that an error occurred
  End Try
 End Sub

Note that the Receive method is called with a timeout value of 5 seconds so that as soon as no new messages are received in a 5-second interval, the loop is exited. The ExtractStudents method is similar to that shown in Listing 13.1, with the exception that it has been modified to accept a Stream object rather than a file name like so:

Public Sub ExtractStudents(ByVal pStream As Stream)

In this way, you can easily take advantage of the many classes in the Services Framework that rely on streams.

Finally, the MessageQueue class follows the Services Framework pattern for asynchronous operations by exposing BeginPeek, BeginReceive and EndPeek, EndReceive methods. The Begin methods are overloaded and as expected do not block the current thread and return immediately. The methods spawn a background thread that waits until a message is found. When found, the application is notified through either the PeekCompleted and ReceiveCompleted events or an explicit AsyncCallback object passed to the method.

For example, to add the event handler for the ReceiveCompleted event and initiate the asynchronous process, the following code could be used:

oQueue = New MessageQueue(pQueue)
AddHandler oQueue.ReceiveCompleted, AddressOf MessageFound
oQueue.BeginReceive(New TimeSpan(0, 0, 5))

You'll notice that the BeginReceive method can also be passed a TimeSpan object that fires the ReceiveCompleted event if the time expires before a message is found. Although not shown here, an Object can be passed to the begin methods that contains state information return in the AsyncState property of the AsyncResult object.

Within the event handlers, the MessageQueue from which the message is returned is populated in the first argument while the arguments are encapsulated in the ReceiveCompletedEventArgs object. The actual Message object can then be accessed by calling the EndReceive method, as illustrated by the template code that follows:

Public Sub MessageFound(ByVal s As Object, _
 ByVal args As ReceiveCompletedEventArgs)

  Dim oQueue As MessageQueue
  Dim oMessage As Message
  Dim oState As Object

  ' Retrieve the state if needed
  oState = args.AsyncResult.AsyncState()

  ' Retrieve the queue from which the message originated
  oQueue = CType(s, MessageQueue)

  Try
    oMessage = oQueue.EndReceive(args.AsyncResult)
    ' Process the message here
  Catch e As MessageQueueException
    ' Timeout expired
  End Try

End Sub

When using the asynchronous peek methods, there is an analogous PeekCompletedEventArgs object for use in the PeekCompleted event handler.

TIP

If you wish to continue receiving documents in the background, you can call BeginReceive at the end of the event handler.

  • + Share This
  • 🔖 Save To Your Account

Discussions

comments powered by Disqus