Home > Articles > Programming > Java

  • Print
  • + Share This
Like this article? We recommend

Like this article? We recommend

Queues and Executors

Next up is ConcurrentLinkedQueue, which is an unbounded thread-safe queue based on linked nodes. The ConcurrentLinkedQueue class is a good choice of structure when many threads share a common queue-based collection. This structure allows you to add and remove elements from the queue in a thread-safe manner. Because you add new nodes to the tail of the queue and remove nodes from the head, nodes are removed from the queue in the order in which they were added.

The way to test drive the ConcurrentLinkedQueue class is as follows:

  1. Create an instance of ConcurrentLinkedQueue.
  2. Add some entries to the instance, preferably from different threads.
  3. Remove entries from the queue, preferably from different threads.

An example of such code is shown in Listing 5. Notice that the ConcurrentQueue class implements Runnable. I chose this approach so I can use another java.util.concurrent class called Executor, which we'll see in action shortly.

Listing 5 A concurrent-queue management class.

public class ConcurrentQueue implements Runnable {

    private ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();

    public String dequeueItem() {
       if (!concurrentLinkedQueue.isEmpty()) {
           System.out.println("Queue size: " + concurrentLinkedQueue.size());
           return concurrentLinkedQueue.remove();
       } else {
           return null;
       }
    }

    private void enqueueItem(String item) {
       System.out.println("Enqueueing item " + item);
       concurrentLinkedQueue.add(item);
    }

    public int getQueueSize() {
       if (!concurrentLinkedQueue.isEmpty()) {
           return concurrentLinkedQueue.size();
       } else {
           return 0;
       }
    }

    public void run() {
       for (int i = 0; i < 10; i++) {
           enqueueItem("String " + i);

           try {
              Thread.sleep(1000);
           } catch (InterruptedException e) {
              e.printStackTrace();
           }
       }
    }
}

To access the queue object, we need a producer and consumer pair. Listing 5 has a producer in the run() method, where we simply enqueue 10 String items. What about the consumer? Listing 6 demonstrates a consumer client of the queue object. Notice that the ConcurrentQueueClient class in Listing 6 also implements Runnable.

Listing 6 A concurrent queue client.

public class ConcurrentQueueClient implements Runnable {

    private ConcurrentQueue concurrentQueue;

    public ConcurrentQueueClient(ConcurrentQueue concurrentQueue) {
       this.concurrentQueue = concurrentQueue;
    }

    public void run() {
       boolean stopCondition = (concurrentQueue.getQueueSize() == 0);

       while (!stopCondition) {
           for (int i = 0; i < concurrentQueue.getQueueSize(); i++) {
              System.out.println("Client dequeue item "
                     + concurrentQueue.dequeueItem());
              try {
                  Thread.sleep(1500);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
           }
           stopCondition = (concurrentQueue.getQueueSize() == 0);
       }

       System.out.println("Client thread exiting...");
    }
}

Listing 6 removes items from the queue and also yields, using Thread.sleep(1500), to allow the producer to catch up by adding more entries to the queue. Finally, when the queue is empty, the run() method returns and the associated thread terminates.

Now we finally get to see why I use classes that implement Runnable: I need to execute the objects of these classes. To do that, I use a class that implements the Executor interface (see Listing 7).

Listing 7 The Executor implementing class.

public class ThreadPerTaskExecutor implements Executor {

  public void execute(Runnable r) {
    new Thread(r).start();
  }

    public static void main(String[] args) {
        Executor executor = new ThreadPerTaskExecutor();

        ConcurrentQueue concurrentQueue = new ConcurrentQueue();
        executor.execute(concurrentQueue);
        executor.execute(new ConcurrentQueueClient(concurrentQueue));
    }
}

Listing 7 uses the ThreadPerTaskExecutor class to schedule execution of the producer and consumer pair. Listing 8 shows a sample execution.

Listing 8 A sample run—your figures will almost certainly differ!

Enqueueing item String 0
Queue size: 1
Client dequeue item String 0
Enqueueing item String 1
Queue size: 1
Client dequeue item String 1
Enqueueing item String 2
Enqueueing item String 3
Queue size: 2
Client dequeue item String 2
Enqueueing item String 4
Queue size: 2
Client dequeue item String 3
Enqueueing item String 5
Enqueueing item String 6
Queue size: 3
Client dequeue item String 4
Enqueueing item String 7
Queue size: 3
Client dequeue item String 5
Enqueueing item String 8
Enqueueing item String 9
Queue size: 4
Client dequeue item String 6
Queue size: 3
Client dequeue item String 7
Queue size: 2
Client dequeue item String 8
Queue size: 1
Client dequeue item String 9
Client thread exiting...

The sample run in Listing 8 illustrates interleaved execution of the two threads—one thread adds to the queue and the other removes items from the queue. This shows the producer-consumer pair in operation under the control of the Executor-based class called ThreadPerTaskExecutor.

  • + Share This
  • 🔖 Save To Your Account