Home > Articles > Programming > General Programming/Other Languages

  • Print
  • + Share This

4.3 Services in Threads

Many tasks compute results or provide services that are not immediately used by their clients, but are eventually required by them. In these situations, unlike those involving oneway messages, a client's actions at some point become dependent on completion of the task.

This section describes some of the available design alternatives: adding callbacks to oneway messages, relying on Thread.join, building utilities based on Futures, and creating worker threads. Section 4.4 revisits and extends these techniques in the context of improving the performance of computationally intensive tasks on parallel processors.

4.3.1 Completion Callbacks

From the perspective of pure oneway message passing, the most natural way to deal with completion is for a client to activate a task via a oneway message to a server, and for the server later to indicate completion by sending a oneway callback message to the caller. This efficient, asynchronous, notification-based style applies best in loosely-coupled designs in which completion of the service triggers some independent action in the client. Completion callback designs are sometimes structurally identical to Observer designs (see 3.5.2).

Figure 4-33

For example, consider an application that offers several features, of which one or more require that a certain file be read in first. Because IO is relatively slow, you don't want to disable other features while the file is being read in — this would decrease responsiveness. One solution is to create a FileReader service that asynchronously reads in the file, and then issues a message back to the application when it has completed, so that the application can proceed with the feature(s) that require it.

4.3.1.1 Interfaces

To set up such a FileReader, or any other service using completion callbacks, you must first define a client interface for callback messages. The methods in this interface are substitutes of sorts for the kinds of return types and exceptions that would be associated with procedural versions of the service. This usually requires two kinds of methods, one associated with normal completion, and one associated with failure that is invoked upon any exception.

Figure 4-34

Additionally, callback methods often require an argument indicating which action completed, so that the client can sort them out when there are multiple calls. In many cases this can be accomplished simply by sending back some of the call arguments. In more general schemes, the service hands back a unique identifier (usually known as a cookie) both as the return value for the initial request and as an argument in any callback method. Variants of this technique are used behind the scenes in remote invocation frameworks that implement procedural calls via asynchronous messages across networks.

In the case of FileReader, we could use interfaces such as:

interface FileReader {
 void read(String filename, FileReaderClient client);
}

interface FileReaderClient {
 void readCompleted(String filename, byte[ ] data);
 void readFailed(String filename, IOException ex);
}

4.3.1.2 Implementations

There are two styles for implementing these interfaces, depending on whether you'd like the client or the server to create the thread that performs the service. Generally, if the service can be useful without running in its own thread, then control should be assigned to clients.

In the more typical case in which the use of threads is intrinsic to completion callback designs, control is assigned to the service method. Note that this causes callback methods to be executed in the thread constructed by the service, not one directly constructed by the client. This can lead to surprising results if any code relies on thread-specific properties such as ThreadLocal and java.security.AccessControlContext (see 2.3.2) that are not known by the service.

Here we could implement a client and server using a service-creates-thread approach as:

class FileReaderApp implements FileReaderClient { // Fragments
 protected FileReader reader = new AFileReader();

 public void readCompleted(String filename, byte[ ] data) {
  // ... use data ...
 }

 public void readFailed(String filename, IOException ex){
  // ... deal with failure ...
 }

 public void actionRequiringFile() {
  reader.read("AppFile", this);
 }

 public void actionNotRequiringFile() { ... }
}

class AFileReader implements FileReader {

 public void read(final String fn, final FileReaderClient c) {
  new Thread(new Runnable() {
   public void run() { doRead(fn, c); }
  }).start();
 }

 protected void doRead(String fn, FileReaderClient client) {
  byte[ ] buffer = new byte[1024]; // just for illustration
  try {
   FileInputStream s = new FileInputStream(fn);
   s.read(buffer);
   if (client != null) client.readCompleted(fn, buffer);
  }
  catch (IOException ex) {
   if (client != null) client.readFailed(fn, ex);
  }
 }
}

The service class here is written to deal with a null client argument, thus accommodating clients that do not need callbacks. While this is not particularly likely here, callbacks in many services can be treated as optional. As an alternative, you could define and use a NullFileReaderClient class that contains no-op versions of the callback methods (see Further Readings). Also, as far as the service is concerned, the callback target might as well be any object at all, for example some helper of the object that requests the service. You can also replace callbacks with event notifications using the techniques illustrated in 3.1.1.6.

4.3.1.3 Guarding callback methods

In some applications, clients can process completion callbacks only when they are in particular states. Here, the callback methods themselves should contain guards that suspend processing of each incoming callback until the client can deal with it.

For example, suppose we have a FileReaderClient that initiates a set of asynchronous file reads and needs to process them in the order issued. This construction mimics how remote invocations are usually handled: Typically each request is assigned a sequence number, and replies are processed in sequence order. This can be a risky strategy, since it will cause indefinite suspension of ready callbacks if one or more of them never completes. This drawback could be addressed by associating time-outs with the waits.

class FileApplication implements FileReaderClient { // Fragments
 private String[ ] filenames;
 private int currentCompletion; // index of ready file

 public synchronized void readCompleted(String fn, byte[ ] d) {
  // wait until ready to process this callback
  while (!fn.equals(filenames[currentCompletion])) {
   try { wait(); } 
   catch(InterruptedException ex) { return; }
  }
  // ... process data... 
  // wake up any other thread waiting on this condition:
  ++currentCompletion;
  notifyAll();
 }

 public synchronized void readFailed(String fn, IOException e){
  // similar... 
 }

 public synchronized void readfiles() {
  currentCompletion = 0;
  for (int i = 0; i < filenames.length; ++i)
   reader.read(filenames[i],this);
 }
}

4.3.2 Joining Threads

While completion callbacks are very flexible, they are at best awkward to use when a caller just needs to wait out a particular task that it started.

Figure 4-35

If an operation occurring in some thread A cannot continue until some thread B completes, you can block thread A via any of the waiting and notification techniques discussed in Chapter 3. For example, assuming the existence of a Latch (see 3.4.2) named terminated accessible from both threads A and B, thread A may wait via terminated.acquire(), and thread B may execute terminated.release() upon otherwise completing its task.

However, there is usually no reason to set up your own waiting and notification constructions, since this functionality is already provided by Thread.join: The join method blocks the caller while the target isAlive. Terminating threads automatically perform notifications. The monitor object used internally for this waiting and notification is arbitrary and may vary across JVM implementations. In most, the target Thread object itself is used as the monitor object. (This is one reason for not extending class Thread to add run methods that invoke waiting or notification methods.) In cases where these details of Thread.join don't fit the needs of a particular application, you can always fall back to the manual approach.

Either Thread.join or explicitly coded variants can be used in designs where a client needs a service to be performed but does not immediately rely on its results or effects. (This is sometimes known as deferred-synchronous invocation.) This is often the case when the service task is time-consuming and can benefit from CPU and/or IO parallelism, so that running it in a separate thread can improve overall throughput.

One common application is image processing. Obtaining the raw data for an image from a file or socket and then converting it into a form that can be displayed are time-consuming operations that involve both CPU and IO processing. Often, this processing can be overlapped with other display-related operations.

A version of this strategy is used by java.awt.MediaTracker and related classes, which should be used when they apply. Here, we'll illustrate a more generic version that can be extended and refined in various ways to support specialized applications.

To set this up, suppose there is a generic Pic interface for images, and a Renderer interface describing services that accept a URL pointing to image data and ultimately return a Pic. (In a more realistic setting, the render method would surely also throw various failure exceptions. Here, we will assume that it simply returns null on any failure.) Also, assume existence of a StandardRenderer class implementing interface Renderer.

Thread.join can be used to write clients such as the following PictureApp class (which invokes several made-up methods just for the sake of illustration). It creates a Runnable waiter object that both initiates the rendering thread and keeps track of the returned result.

Figure 4-36

While it is common practice, the use of unsynchronized (or direct) access of internal result fields as seen in the waiter object is a bit delicate. Since access is not synchronized, correctness relies on the fact that both thread termination and the join method intrinsically employ synchronized methods or blocks (see 2.2.7).

interface Pic {
 byte[ ] getImage();
}

interface Renderer {
 Pic render(URL src);
}

class PictureApp {                // Code sketch
 // ...
 private final Renderer renderer = new StandardRenderer();

 public void show(final URL imageSource) {

  class Waiter implements Runnable {
   private Pic result = null;
   Pic getResult() { return result; }
   public void run() { 
    result = renderer.render(imageSource); 
   }
  };

  Waiter waiter = new Waiter();
  Thread t = new Thread(waiter); 
  t.start();

  displayBorders(); // do other things 
  displayCaption(); // while rendering

  try { 
   t.join();
  }
  catch(InterruptedException e) { 
   cleanup();
   return; 
  }

  Pic pic = waiter.getResult();
  if (pic != null) 
   displayImage(pic.getImage()); 
  else
   // ... deal with assumed rendering failure
 }
}

Thread.join returns control to the caller whether the thread completed successfully or abnormally. For simplicity of illustration, nullness of the result field is used here to indicate any kind of failure, including cancellation of the renderer. The version in 4.3.3.1 illustrates a more responsible approach.

4.3.3 Futures

The operations underlying join-based constructions can be packaged in a more convenient and structured fashion by:

  • Creating Futures — "virtual" data objects that automatically block when clients try to invoke their field accessors before their computation is complete. A Future acts as an "IOU" for a given data object.

  • Creating versions of service methods that start up one or more threads and then return Future objects that are unblocked when computations complete.

Because the mechanics surrounding futures are built into data access and service methods, they can be applied in a general fashion only if both the data objects and the service methods are defined using interfaces, not classes. However, if the associated interfaces are defined, Futures are easy to set up. For example, a Future-based AsynchRenderer can employ proxies around concrete implementation classes (see 1.4.2):

Figure 4-37

class AsynchRenderer implements Renderer {
 private final Renderer renderer = new StandardRenderer();

 static class FuturePic implements Pic { // inner class
  private Pic pic = null;
  private boolean ready = false;
  synchronized void setPic(Pic p) {
   pic = p; 
   ready = true;
   notifyAll();
  }

  public synchronized byte[ ] getImage() {
   while (!ready) 
    try { wait(); } 
    catch (InterruptedException e) { return null; }
   return pic.getImage();
  }
 }
 
 public Pic render(final URL src) {
  final FuturePic p = new FuturePic();
  new Thread(new Runnable() { 
   public void run() { p.setPic(renderer.render(src)); }
  }).start();
  return p;
 }
}

For illustration, AsynchRenderer uses explicit waiting and notification operations based on a ready flag rather than relying on Thread.join.

Applications relying on this class can be written in a simple fashion:

class PicturAppWithFuture {           // Code sketch
 private final Renderer renderer = new AsynchRenderer();

 public void show(final URL imageSource) {
  Pic pic = renderer.render(imageSource);

  displayBorders(); // do other things ... 
  displayCaption(); 

  byte[ ] im = pic.getImage();
  if (im != null)
   displayImage(im); 
  else // deal with assumed rendering failure
 }
}

4.3.3.1 Callables

Most designs based on Futures take exactly the form illustrated in class AsynchRenderer. The construction and use of such classes can be further standardized and automated by stepping up to a blander interface.

In the same way that interface Runnable describes any pure action, a Callable interface can be used to describe any service method that accepts an Object argument, returns an Object result, and may throw an Exception:

interface Callable {
 Object call(Object arg) throws Exception;
}

The use of Object here (awkwardly) accommodates, for example, adaptation of methods accepting multiple arguments by stuffing them into array objects.

While there are other options, it is most convenient to package up support mechanics via a single class that coordinates usage. The following FutureResult class shows one set of choices. (It is a streamlined version of one in the util.concurrent package available from the online supplement.)

The FutureResult class maintains methods to get the result Object that is returned, or the Exception that is thrown by a Callable. Unlike our Pic versions where all failures were just indicated via null values, it deals with interruptions more honestly by throwing exceptions back to clients attempting to obtain results.

To differentiate properly between exceptions encountered in the service versus those encountered trying to execute the service, exceptions thrown by the Callable are repackaged using java.lang.reflect.InvocationTargetException, a general-purpose class for wrapping one exception inside another.

Also, for the sake of generality, the FutureResult does not itself create threads. Instead, it supports method setter that returns a Runnable that users can then execute within a thread or any other code Executor (see 4.1.4). This makes Callables usable within lightweight executable frameworks that are otherwise set up to handle tasks initiated via oneway messages. As an alternative strategy, you could set up a Caller framework that is otherwise similar to Executor, but is more specialized to the needs of service-style tasks, for example supporting methods to schedule execution, check status, and control responses to exceptions.

class FutureResult {               // Fragments
 protected Object value = null;
 protected boolean ready = false;
 protected InvocationTargetException exception = null;

 public synchronized Object get() 
  throws InterruptedException, InvocationTargetException {

  while (!ready) wait();

  if (exception != null)
   throw exception;
  else
   return value;
 }

 public Runnable setter(final Callable function) {
  return new Runnable() {
   public void run() {
    try {
     set(function.call());
    }
    catch(Throwable e) {
     setException(e);
    }
   }
  };
 }

 synchronized void set(Object result) {
  value = result;
  ready = true;
  notifyAll();
 }

 synchronized void setException(Throwable e) {
  exception = new InvocationTargetException(e);
  ready = true;
  notifyAll();
 }

 // ... other auxiliary and convenience methods ...
}

The FutureResult class can be used directly to support generic Futures or as a utility in constructing more specialized versions. As an example of direct use:

class PictureDisplayWithFutureResult {      // Code sketch

 private final Renderer renderer = new StandardRenderer();
 // ...
 
 public void show(final URL imageSource) {

  try {
   FutureResult futurePic = new FutureResult();
   Runnable command = futurePic.setter(new Callable() {
    public Object call() { 
     return renderer.render(imageSource);
    }
   });
   new Thread(command).start();

   displayBorders(); 
   displayCaption();

   displayImage(((Pic)(futurePic.get())).getImage());
  }

  catch (InterruptedException ex) { 
   cleanup();
   return; 
  }
  catch (InvocationTargetException ex) { 
   cleanup(); 
   return;
  }
 }
}

This example demonstrates some of the minor awkwardnesses introduced by reliance on generic utilities that help standardize usage protocols. This is one reason that you might want to use FutureResult in turn to construct a more specialized and easier-to-use version with the same methods and structure as the AsynchRenderer class.

4.3.4 Scheduling Services

As discussed in 4.1.4, worker thread designs can sometimes improve performance compared to thread-per-task designs. They can also be used to schedule and optimize execution of service requests made by different clients.

As a famous example, consider a class controlling read and write access for a disk containing many cylinders but only one read/write head. The interface for the service contains just read and write methods. In practice, it would surely use file block indicators instead of raw cylinder numbers and would deal with or throw various IO exceptions, here abbreviated as a single Failure exception.

interface Disk {
 void read(int cylinderNumber, byte[ ] buffer) throws Failure;
 void write(int cylinderNumber, byte[ ] buffer) throws Failure;
}

Rather than servicing access requests in the order they are made, it is faster on average to sweep the head across the cylinders, accessing cylinders in ascending order and then resetting the head position back to the beginning after each sweep. (Depending in part on the type of disk, it may be even better to arrange requests in both ascending and descending sweeps, but we will stick to this version.)

This policy would be tricky to implement without some kind of auxiliary data structure. The enabling condition for a request to execute is:

    Wait until the current request cylinder number is the least greater cylinder number relative to that of the current disk head of all of those currently waiting, or is the least numbered cylinder if the head cylinder number is greater than that of all requests.

This condition is too awkward, inefficient, and possibly even deadlock-prone to try to coordinate across a set of otherwise independent clients. But it can be implemented fairly easily with the help of an ordered queue employed by a single worker thread. Tasks can be added to the queue in cylinder-based order, then executed when their turns arrive. This "elevator algorithm" is easiest to arrange by using a two-part queue, one for the current sweep and one for the next sweep.

Figure 4-38

The resulting framework combines Future-like constructs with the worker thread designs from 4.1.4. To set this up, we can define a Runnable class to include the extra bookkeeping associated with DiskTasks. The queue class uses the semaphore-based approach discussed in 3.4.1, but here applied to ordered linked lists. The server class constructs a worker thread that runs tasks from the queue. The public service methods create tasks, place them on the queue, and then wait them out before returning to clients.

abstract class DiskTask implements Runnable {
 protected final int cylinder;    // read/write parameters
 protected final byte[ ] buffer;
 protected Failure exception = null;    // to relay out
 protected DiskTask next = null;      // for use in queue
 protected final Latch done = new Latch(); // status indicator

 DiskTask(int c, byte[ ] b) { cylinder = c; buffer = b; }

 abstract void access() throws Failure; // read or write

 public void run() {
  try { access(); }
  catch (Failure ex) { setException(ex); }
  finally { done.release(); }
 }

 void awaitCompletion() throws InterruptedException {
  done.acquire();
 }

 synchronized Failure getException() { return exception; }
 synchronized void setException(Failure f) { exception = f; }
}

class DiskReadTask extends DiskTask {
 DiskReadTask(int c, byte[ ] b) { super(c, b); }
 void access() throws Failure { /* ... raw read ... */ }
}

class DiskWriteTask extends DiskTask {
 DiskWriteTask(int c, byte[ ] b) { super(c, b); }
 void access() throws Failure { /* ... raw write ... */ }

}
class ScheduledDisk implements Disk {
 protected final DiskTaskQueue tasks = new DiskTaskQueue();

  public void read(int c, byte[ ] b) throws Failure {
  readOrWrite(new DiskReadTask(c, b));
 }

 public void write(int c, byte[ ] b) throws Failure {
  readOrWrite(new DiskWriteTask(c, b));
 }

 protected void readOrWrite(DiskTask t) throws Failure {
  tasks.put(t);
  try {
   t.awaitCompletion();
  }
  catch (InterruptedException ex) {
   Thread.currentThread().interrupt(); // propagate
   throw new Failure(); // convert to failure exception
  }
  Failure f = t.getException();
  if (f != null) throw f;
 }

 public ScheduledDisk() {   // construct worker thread
  new Thread(new Runnable() {
   public void run() {
    try {
     for (;;) {
      tasks.take().run();
     }
    }
    catch (InterruptedException ie) {} // die
   }
  }).start();
 }  
}
class DiskTaskQueue {
 protected DiskTask thisSweep = null;
 protected DiskTask nextSweep = null;
 protected int currentCylinder = 0;

 protected final Semaphore available = new Semaphore(0);

 void put(DiskTask t) {
  insert(t);
  available.release();
 }

 DiskTask take() throws InterruptedException {
  available.acquire();
  return extract();
 }

 synchronized void insert(DiskTask t) {
  DiskTask q;
  if (t.cylinder >= currentCylinder) {  // determine queue
   q = thisSweep;
   if (q == null) { thisSweep = t; return; }
  }
  else {
   q = nextSweep;
   if (q == null) { nextSweep = t; return; }
  }
  DiskTask trail = q;      // ordered linked list insert
  q = trail.next;
  for (;;) {
   if (q == null || t.cylinder < q.cylinder) {
    trail.next = t; t.next = q;
    return;
   }
   else {
    trail = q; q = q.next;
   }
  }
 }
 synchronized DiskTask extract() { // PRE: not empty
  if (thisSweep == null) {      // possibly swap queues
   thisSweep = nextSweep;
   nextSweep = null;
  }
  DiskTask t = thisSweep;
  thisSweep = t.next;
  currentCylinder = t.cylinder;
  return t;
 }
}

4.3.5 Further Readings

ABCL was among the first concurrent object-oriented languages to offer Futures as a language construct. See:

    Yonezawa, Akinori, and Mario Tokoro. Object-Oriented Concurrent Programming, MIT Press, 1988.

Futures are known as wait-by-necessity constructions in Eiffel// (a parallel extension to Eiffel). See:

    Caromel, Denis, and Yves Roudier. "Reactive Programming in Eiffel//", in Jean-Pierre Briot, Jean-Marc Geib and Akinori Yonezawa (eds.) Object Based Parallel and Distributed Computing, LNCS 1107, Springer Verlag, 1996.

Futures and related constructs in the Scheme and Multilisp programming languages are described in:

    Dybvig, R. Kent and Robert Hieb. "Engines from Continuations", Computer Languages, 14(2):109-123, 1989.

    Feeley, Marc. An Efficient and General Implementation of Futures on Large Scale Shared-Memory Multiprocessors, PhD Thesis, Brandeis University, 1993.

Additional techniques associated with completion callbacks in networking applications are described in:

    Pyarali, Irfan, Tim Harrison, and Douglas C. Schmidt. "Asynchronous Completion Token", in Robert Martin, Dirk Riehle, and Frank Buschmann (eds.), Pattern Languages of Program Design, Volume 3, Addison-Wesley, 1999.

The Null Object pattern is often useful for simplifying callback designs in which clients do not always require callback messages. See:

    Woolf, Bobby. "Null Object", in Robert Martin, Dirk Riehle, and Frank Buschmann (eds.), Pattern Languages of Program Design, Volume 3, Addison-Wesley, 1999.

  • + Share This
  • 🔖 Save To Your Account

Related Resources

There are currently no related titles. Please check back later.