Home > Articles > Programming > Java

  • Print
  • + Share This
This chapter is from the book

Exploring the Competing Consumers Pattern

The example we concluded in the previous section tackled integration issues and how to handle external dependencies and enterprise environment test issues. We'll now visit a different example that explores an enterprise integration pattern.

The pattern we'll explore is explained in much greater detail in Gregor Hohpe and Bobby Woolf's excellent book Enterprise Integration Patterns (Addison-Wesley, 2004). We strongly recommend you get hold of a copy if you're serious about enterprise applications.

The Pattern

The pattern we'll examine is the Competing Consumers pattern. This pattern applies to message-driven applications. We have a queue that holds any number of messages that our application needs to process. To improve parallelism and efficiency, we'd like to process messages concurrently. This is particularly useful if message processing can be lengthy or expensive. By expensive processing, we generally mean something that is I/O bound or that involves blocking for a significant period waiting for some other part of the system to do its work while the CPU is idle. For instance, the component we explored in our earlier example communicates with a database and potentially a number of other external systems, so processing an individual message is fairly expensive.

The pattern name simply refers to what happens when firing up a number of instances to process messages. Each of the consumers will read messages off the queue and start processing. The consumers compete for messages since there is one source and more than one consumer.

Note that this approach obviously works only with message queues (point to point).1 Obviously it would not work with topics since in that case all subscribers receive every message, and they do not need to compete for any single message.

We have to make some implicit assumptions to use the Competing Consumers pattern. The most important one is that we cannot guarantee message ordering. Since we are processing messages in parallel and the consumers are competing, we know that messages are not processed serially, and thus we can make no guarantees as to which messages will be processed first. Of course, we could add an ordering property to each message and introduce a buffering processor that holds onto messages until a contiguous sequence is received, then deliver them to the component. For the sake of simplicity, however, we'll instead assume that ordering does not matter.

The other important issue is that we would most likely want to consume the messages transactionally: if a message cannot be processed, we would want to roll back any partial changes we've made to any transactional resources and ensure the message is rolled back to the queue correctly.

Listing 3-16 shows the implementation outline of the processor.

Listing 3-16. Example of a competing consumer

public class MessageProcessor implements MessageListener {

  private Log log = LogFactory.getLog(MessageProcessor.class);

  public void onMessage(Message message) {
    TextMessage msg = (TextMessage)message;
    try {
      String value = msg.getText();
      String someProp = msg.getStringProperty("prop");
      process(value, someProp);
    catch(JMSException e) {
      log.error("Error processing message", e);

  public void process(String value, String someProp) {
    // do a lot of expensive stuff, like a DB conversation

Since this processor is a MessageListener, the JMS provider (or EJB container) will invoke it asynchronously, so each instance will run in its own thread.

The Test

Having implemented this pattern, how do we test it? Before we start writing a test, it's important to first decide what exactly we're trying to test.

The first issue is that the consumers run concurrently, so we'll have to somehow simulate that in the test to match what happens in production. Once we've done that, we'd also like to verify the results. It doesn't matter which consumer ran first or which completed first. We would like to assert that with a given number of consumers, we have a set of known results that we can verify. For the sake of simplicity, we'll assume that the results are entries in a database that we can query.

Using the techniques discussed earlier, we can use a clean database for our test. It doesn't need to be completely empty; it can contain enough static data so that our consumer can do its job. However, it should not be shared with other tests, so we can easily determine the changes that the test caused.

Also, from what we learned earlier, our goal is not to test JMS functionality. We know, for example, that JMS supports transactions, which ensures that a consumer is transactional. We also know that specifying the transactional behavior is a configuration issue. The test in this case focuses on the business functionality. Chapter 4 will cover the details of testing-specific APIs such as JMS.

The component is already refactored so that the business functionality is not coupled to JMS, so we can ignore that aspect of it for the purposes of our test.

Listing 3-17 shows the test for the component.

Listing 3-17. Test for Competing Consumers pattern

private final List<Object[]> data =
  Collections.synchronizedList(new ArrayList<Object[]>());

public void populateData() {
data.add(new Object[]{"value1", "prop1"});
  data.add(new Object[]{"value2", "prop2"});
  data.add(new Object[]{"value3", "prop3"});

@Test(threadPoolSize = 3, invocationCount = 3,
public void runConcurrentProcessors(String value,
                                    String someProp) {
  // create processor
  MessageProcessor processor = new MessageProcessor();
  // invoke method to test
  processor.process(value, someProp);

@Test(dependsOnMethods = "runConcurrentProcessors")
public void verifyConcurrentProcessors() {
  // load data from db
  // verify that we have 3 results
  // verify that each of the 3 results matches our 3 inputs

@DataProvider(name = "concurrent-processor")
public Object[][] getProcessorData() {
  return new Object[][]{data.remove(data.size() - 1)};

The test uses a number of interesting approaches, so it's worth spending some time exploring them. For one thing, since at runtime we expect that the container for our component is multithreaded, we don't need to worry about firing up multiple instances since the container will most likely handle that. For our test, however, since the test is effectively the container, we do need to explicitly specify that we'd like multiple threads.

Our test is split into two tests, one that handles running the consumers and one that then verifies the results. Why the split? The reason is that runConcurrentProcessors is invoked multiple times, and we only want to verify results once all invocations of that method have completed. To express this ordering, we use the dependsOnMethods annotation attribute.

To express the desired parallelism, we use the threadPoolSize and invocationCount annotation attributes. We specify that we'd like three threads to run our test three times.

Having done that, we next turn our attention to generating data for the consumers. The test class is initialized with a list of test data. The populateData() method is run before the test and fills the list with the values we'd like the consumer threads to use. Note that configuration methods are run once, and only tests are run in parallel.

The run test takes in two parameters, denoting the data we'd like to pass to each consumer. The data comes from the concurrent-processor Data Provider.

This Data Provider is fulfilled by the getProcessorData() method. However, instead of just returning the set of three data items we want, we're removing items from the data list one at a time. Why is this?

To see why this works, we need to understand how Data Providers interact with concurrent tests (or don't, more accurately). When TestNG sees a Data Provider, it invokes the test once for every data item returned. Similarly, when we specify an invocation count, TestNG invokes the test the specified number of times.

Therefore, if we were to return the three data items we have in our Data Provider, each thread would then invoke the test three times, once for each data item. This would result in the test being invoked nine times, something we definitely do not want.

The solution therefore is to use a stack structure, and for each time the Data Provider is invoked, return one item and remove it from the list. The Data Provider will be invoked three times (due to the specified invocationCount) and each time will return a unique data item for the test, thus ensuring that each thread gets its own parameters when invoking the test.

Of course, instead of all that work with the Data Provider, we could have just run all three instances ourselves inline. While that might have worked with three instances, it very quickly becomes unwieldy if we have more.

As a design principle, it is crucial that we separate data concerns from functional ones. In this case, the specific data that the consumer runs with should be orthogonal to the actual test. This approach means that as our data needs to evolve and become more complex, the test itself does not need to be modified. Of course, we also need to apply some common sense here: If a test has only one data item, there is no need to externalize that into a Data Provider.

  • + Share This
  • 🔖 Save To Your Account