Home > Articles > Programming > Java

  • Print
  • + Share This

4.4 Parallel Decomposition

Parallel programs are specifically designed to take advantage of multiple CPUs for solving computation-intensive problems. The main performance goals are normally throughput and scalability — the number of computations that can be performed per unit time, and the potential for improvement when additional computational resources are available. However, these are often intertwined with other performance goals. For example, parallelism may also improve response latencies for a service that hands off work to a parallel execution facility.

Among the main challenges of parallelism in the Java programming language is to construct portable programs that can exploit multiple CPUs when they are present, while at the same time working well on single processors, as well as on time-shared multiprocessors that are often processing unrelated programs.

Some classic approaches to parallelism don't mesh well with these goals. Approaches that assume particular architectures, topologies, processor capabilities, or other fixed environmental constraints are ill suited to commonly available JVM implementations. While it is not a crime to build run-time systems with extensions specifically geared to particular parallel computers, and to write parallel programs specifically targeted to them, the associated programming techniques necessarily fall outside the scope of this book. Also, RMI and other distributed frameworks can be used to obtain parallelism across remote machines. In fact, most of the designs discussed here can be adapted to use serialization and remote invocation to achieve parallelism over local networks. This is becoming a common and efficient means of coarse-grained parallel processing. However, these mechanics also lie outside the scope of this book.

We instead focus on three families of task-based designs, fork/join parallelism, computation trees, and barriers. These techniques can yield very efficient programs that exploit multiple CPUs when present, yet still maintain portability and sequential efficiency. Empirically, they are known to scale well, at least up through dozens of CPUs. Moreover, even when these kinds of task-based parallel programs are tuned to maximally exploit a given hardware platform, they require only minor retunings to maximally exploit other platforms.

As of this writing, probably the most common targets for these techniques are applications servers and compute servers that are often, but by no means always, multiprocessors. In either case, we assume that CPU cycles are usually available, so the main goal is to exploit them to speed up the solution of computational problems. In other words, these techniques are unlikely to be very helpful when programs are run on computers that are already nearly saturated.

4.4.1 Fork/Join

Fork/join decomposition relies on parallel versions of divide-and-conquer techniques familiar in sequential algorithm design. Solutions take the form:

pseudoclass Solver {               // Pseudocode 
 // ...
 Result solve(Param problem) {
  if (problem.size <= BASE_CASE_SIZE) 
   return directlySolve(problem);
  else {
   Result l, r;
   IN-PARALLEL {
    l = solve(lefthalf(problem));
    r = solve(rightHalf(problem));
   }
   return combine(l, r);
  }
 }
}

It takes some hard work and inspiration to invent a divide-and-conquer algorithm. But many common computationally intensive problems have known solutions of approximately this form. Of course, there may be more than two recursive calls, multiple base cases, and arbitrary pre- and post-processing surrounding any of the cases.

Familiar sequential examples include quicksort, mergesort, and many data structure, matrix, and image processing algorithms. Sequential recursive divide-and-conquer designs are easy to parallelize when the recursive tasks are completely independent; that is, when they operate on different parts of a data set (for example different sections of an array) or solve different sub-problems, and need not otherwise communicate or coordinate actions. This often holds in recursive algorithms, even those not originally intended for parallel implementation.

Additionally, there are recursive versions of algorithms (for example, matrix multiplication) that are not used much in sequential contexts, but are more widely used on multiprocessors because of their readily parallelizable form. And other parallel algorithms perform extensive transformations and preprocessing to convert problems into a form that can be solved using fork/join techniques. (See Further Readings in 4.4.4.)

The IN-PARALLEL pseudocode is implemented by forking and later joining tasks performing the recursive calls. However, before discussing how to do this, we first examine issues and frameworks that permit efficient parallel execution of recursively generated tasks.

4.4.1.1 Task granularity and structure

Many of the design forces encountered when implementing fork/join designs surround task granularity:

    Maximizing parallelism. In general, the smaller the tasks, the more opportunities for parallelism. All other things being equal, using many fine-grained tasks rather than only a few coarse-grained tasks keeps more CPUs busy, improves load balancing, locality and scalability, decreases the percentage of time that CPUs must idly wait for one another, and leads to greater throughput.

    Minimizing overhead. Constructing and managing an object to process a task in parallel, rather than just invoking a method to process it serially, is the main unavoidable overhead associated with task-based programming compared with sequential solutions. It is intrinsically more costly to create and use task objects than to create and use stack-frames. Additionally, the use of task objects can add to the amount of argument and result data that must be transmitted and can impact garbage collection. All other things being equal, total overhead is minimized when there are only a few coarse-grained tasks.

    Minimizing contention. A parallel decomposition is not going to lead to much speed-up if each task frequently communicates with others or must block waiting for resources held by others. Tasks should be of a size and structure that maintain as much independence as possible. They should minimize (in most cases, eliminate) use of shared resources, global (static) variables, locks, and other dependencies. Ideally, each task would contain simple straight-line code that runs to completion and then terminates. However, fork/join designs require at least some minimal synchronization. The main object that commences processing normally waits for all subtasks to finish before proceeding.

    Maximizing locality. Each subtask should be the only one operating on some small piece of a problem, not only conceptually but also at the level of lower-level resources and memory access patterns. Refactorings that achieve good locality of reference can significantly improve performance on modern heavily cached processors. When dealing with large data sets, it is not uncommon to partition computations into subtasks with good locality even when parallelism is not the main goal. Recursive decomposition is often a productive way to achieve this. Parallelism accentuates the effects of locality. When parallel tasks all access different parts of a data set (for example, different regions of a common matrix), partitioning strategies that reduce the need to transmit updates across caches often achieve much better performance.

4.4.1.2 Frameworks

There is no general optimal solution to granularity and related task structuring issues. Any choice represents a compromise that best resolves the competing forces for the problem at hand. However, it is possible to build lightweight execution frameworks that support a wide range of choices along the continuum.

Thread objects are unnecessarily heavy vehicles for supporting purely computational fork/join tasks. For example, these tasks never need to block on IO, and never need to sleep. They require only an operation to synchronize across subtasks. Worker thread techniques discussed in 4.1.4 can be extended to construct frameworks efficiently supporting only the necessary constructs. While there are several approaches, for concreteness we'll limit discussion to a framework in util.concurrent that restricts all tasks to be subclasses of class FJTask. Here is a brief sketch of principal methods. More details are discussed along with examples in 4.4.1.4 through 4.4.1.7.

abstract class FJTask implements Runnable {
 boolean isDone();         // True after task is run
 void cancel();          // Prematurely set as done
 void fork();           // Start a dependent task
 void start();           // Start an arbitrary task
 static void yield();        // Allow another task to run
 void join();           // Yield caller until done
 static void invoke(FJTask t);   // Directly run t
 static void coInvoke(FJTask t, 
            FJTask u);  // Fork and join t and u
 static void coInvoke(FJTask[ ] tasks); // coInvoke all
 void reset();           // Clear to allow reuse
}

An associated FJTaskRunnerGroup class provides control and entry points into this framework. A FJTaskRunnerGroup is constructed with a given number of worker threads that should ordinarily be equal to the number of CPUs on a system. The class supports method invoke that starts up a main task, which will in turn normally create many others.

FJTasks must employ only these task control methods, not arbitrary Thread or monitor methods. While the names of these operations are the same or similar to those in class Thread, their implementations are very different. In particular, there are no general suspension facilities. For example, the join operation is implemented simply by having the underlying worker thread run other tasks to completion until the target task is noticed to have completed (via isDone). This wouldn't work at all with ordinary threads, but is effective and efficient when all tasks are structured as fork/join methods.

These kinds of trade-offs make FJTask construction and invocation substantially cheaper than would be possible for any class supporting the full Thread interface. As of this writing, on at least some platforms, the overhead of creating, running, and otherwise managing a FJTask for the kinds of examples illustrated below is only between four and ten times that of performing equivalent sequential method calls.

The main effect is to lessen the impact of overhead factors when making choices about task partitioning and granularity. The granularity threshold for using tasks can be fairly small — on the order of a few thousand instructions even in the most conservative cases — without noticeably degrading performance on uniprocessors. Programs can exploit as many CPUs as are available on even the largest platforms without the need for special tools to extract or manage parallelism. However, success also depends on construction of task classes and methods that themselves minimize overhead, avoid contention, and preserve locality.

4.4.1.3 Defininhg tasks

Sequential divide-and-conquer algorithms can be expressed as fork/join-based classes via the following steps:

  1. Create a task class with:

    • Fields to hold arguments and results. Most should be strictly local to a task, never accessed from any other task. This eliminates the need for synchronization surrounding their use. However, in the typical case where result variables are accessed by other tasks, they should either be declared as volatile or be accessed only via synchronized methods.

    • A constructor that initializes argument variables.

    • A run method that executes the reworked method code.

  2. Replace the original recursive case with code that:

    • Creates subtask objects.

    • Forks each one to run in parallel.

    • Joins each of them.

    • Combines results by accessing result variables in the subtask objects.

  3. Replace (or extend) the original base case check with a threshold check. Problem sizes less than the threshold should use the original sequential code. This generalization of base case checks maintains efficiency when problem sizes are so small that task overhead overshadows potential gains from parallel execution. Tune performance by determining a good threshold size for the problem at hand.

  4. Replace the original method with one that creates the associated task, waits it out, and returns any results. (In the FJTask framework, the outermost call is performed via FJTaskRunnerGroup.invoke.)

4.4.1.4 Fibonacci

We'll illustrate the basic steps with a very boring and unrealistic, but very simple classic example: recursively computing fib, the Fibonacci function. This function can be programmed sequentially as:

int seqFib(int n) {
  if (n <= 1) 
   return n;
  else 
   return seqFib(n-1) + seqFib(n-2);
 }

Figure 4-39

This example is unrealistic because there is a much faster non-recursive solution for this particular problem, but it is a favorite for demonstrating both recursion and parallelism. Because it does so little other computation, it makes the basic structure of fork/join designs easier to see, yet it generates many recursive calls — at least fib(n) calls to compute fib(n). The first few values of the sequence are 0, 1, 1, 2, 3, 5, 8; fib(10) is 55; fib(20) is 6,765; fib(30) is 832,040; fib(40) is 102,334,155.

Function seqFib can be transformed into a task class such as the following:

class Fib extends FJTask {
 static final int sequentialThreshold = 13; // for tuning 
 volatile int number;            // argument/result

 Fib(int n) { number = n; }

 int getAnswer() {
  if (!isDone()) 
   throw new IllegalStateException("Not yet computed");
  return number;
 }

 public void run() {
  int n = number;
  
  if (n <= sequentialThreshold)    // base case
   number = seqFib(n);
  else {
   Fib f1 = new Fib(n - 1);     // create subtasks
   Fib f2 = new Fib(n - 2);

   coInvoke(f1, f2);         // fork then join both

   number = f1.number + f2.number;  // combine results
  }
 }

 public static void main(String[ ] args) { // sample driver
  try {
   int groupSize = 2;  // 2 worker threads
   int num = 35;     // compute fib(35)
    FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
   Fib f = new Fib(num);
   group.invoke(f);
   int result = f.getAnswer();
   System.out.println("Answer: " + result);
  }
  catch (InterruptedException ex) {} // die
 }
}

Notes:

  • The class maintains a field holding the argument for which to compute the Fibonacci function. Also, we need a variable to hold the result. However, as is fairly typical in such classes, there is no need to keep two variables. For economy (bearing in mind that many millions of Fib objects might be constructed in the course of a computation), we can micro-optimize to use one variable, and overwrite the argument with its result after it is computed. (This is the first of several hand-optimizations that are uncomfortably petty, but are shown here in order to demonstrate minor tweaks that can be pragmatically important in constructing efficient parallel programs.)

  • The number field is declared as volatile to ensure visibility from other tasks/threads after it is computed (see 2.2.7). Here and in subsequent examples, volatile fields are read and/or written only once per task execution, and otherwise held in local variables. This avoids interfering with potential compiler optimizations that are otherwise disabled when using volatile.

  • Alternatively, we could have synchronized access to the number field. But there is no good reason to do so. The use of volatile fields is much more common in lightweight parallel task frameworks than in general-purpose concurrent programming. Tasks usually do not require other synchronization and control mechanics, yet often need to communicate results via field access. The most common reason for using synchronized instead of volatile is to deal with arrays. Individual array elements cannot be declared as volatile. Processing arrays within synchronized methods or blocks is the simplest way to ensure visibility of array updates, even in the typical case in which locking is not otherwise required. An occasionally attractive alternative is instead to create arrays each of whose elements is a forwarding object with volatile fields.

  • The method isDone returns true after the completion of a run method that has been executed via invoke or coInvoke. It is used as a guard in getAnswer to help detect programming errors that could occur if the ultimate consumer of an answer tries to access it prematurely. (There is no chance of this happening here, but this safeguard helps avoid unintended usages.)

  • The sequentialThreshold constant establishes granularity. It represents the balance point at which it is not worth the overhead to create tasks, also reflecting the goal of maintaining good sequential performance. For example, on one set of runs on a four-CPU system, setting sequentialThreshold to 13 resulted in a 4% performance degradation versus seqFib for large argument values when using a single worker thread. But it sped up by a factor of at least 3.8 with four worker threads, processing several million Fib tasks per second.

  • Rather than wiring in a compile-time constant, we could have defined the threshold as a run-time variable and set it to a value based on the number of CPUs available or other platform characteristics. This is useful in task-based programs that do not scale linearly, as is likely to be true even here. As the number of CPUs increase, so do communication and resource management costs, which could be balanced by increasing the threshold.

  • The parallel analog of recursion is performed via a convenient method, coInvoke(FJTask t, FJTask u), which in turn acts as: t.fork(); invoke(u); t.join();

  • The fork method is a specialized analog of Thread.start. A forked task is always processed in stack-based LIFO order when it is run by the same underlying worker thread that spawned it, but in queue-based FIFO order with respect to other tasks if run by another worker thread running in parallel. This represents a cross of sorts between normal stack-based sequential calls, and normal queue-based thread scheduling. This policy (implemented via double-ended scheduling queues) is ideal for recursive task-based parallelism (see Further Readings), and more generally whenever dealing with strictly dependent tasks — those that are spawned either by the tasks that ultimately join them or by their subtasks.

  • In contrast, FJTask.start behaves more like Thread.start. It employs queue-based FIFO scheduling with respect to all worker threads. It is used, for example, by FJTaskRunnerGroup.invoke to start up execution of a new main task.

  • The join method should be used only for tasks initiated via fork. It exploits termination dependency patterns of fork/join subtasks to optimize execution.

  • The FJTask.invoke method runs the body of one task within another task, and waits out completion. Seen differently, it is the one-task version of coInvoke, an optimization of u.fork(); u.join().

Effective use of any lightweight executable framework requires the same understanding of support methods and their semantics as does programming with ordinary Threads. The FJTask framework exploits the symbiosis between recursion and parallel decomposition, and so encourages the divide-and-conquer programming style seen in Fib. However, the range of programming idioms and design patterns conforming to this general style is fairly broad, as illustrated by the following examples.

4.4.1.5 Linking subtasks

Fork/join techniques may be applied even when the number of forked subtasks varies dynamically. Among several related tactics for carrying this out, you can add link fields so that subtasks can be maintained in lists. After spawning all tasks, an accumulate (also known as reduction) operation can traverse the list sequentially, joining and using the results of each subtask.

Stretching the Fib example a bit, the FibVL class illustrates one way to set this up. This style of solution is not especially useful here, but is applicable in contexts in which a dynamic number of subtasks are created, possibly across different methods. Notice that the subtasks here are joined in the opposite order in which they were created. Since the processing order of results does not matter here, we use the simplest possible linking algorithm (prepending), which happens to reverse the order of tasks during traversal. This strategy applies whenever the accumulation step is commutative and associative with respect to results, so tasks can be processed in any order. If the order did matter, we would need to adjust list construction or traversal accordingly.

class FibVL extends FJTask {
 volatile int number; // as before
 final FibVL next;  // embedded linked list of sibling tasks

 FibVL(int n, FibVL list) { number = n; next = list; }

 public void run() {
  int n = number;
  if (n <= sequentialThreshold)
   number = seqFib(n);
  else {
   FibVL forked = null;        // list of subtasks

   forked = new FibVL(n - 1, forked); // prepends to list
   forked.fork();          

   forked = new FibVL(n - 2, forked);
   forked.fork();

   number = accumulate(forked);
  }
 }

 // Traverse list, joining each subtask and adding to result
 int accumulate(FibVL list) {
  int sum = 0;
  for (FibVL f = list; f != null; f = f.next) {
   f.join();
   sum += f.number;
  }
  return sum;
 }
}

4.4.1.6 Callbacks

Recursive task-based fork/join parallelism may be extended to apply when other local synchronization conditions are used instead of join. In the FJTask framework, t.join() is implemented as an optimized version of:

while (!t.isDone()) yield();

Method yield here allows the underlying worker thread to process other tasks. (More specifically, in the FJTask framework, the thread will process at least one other task if one exists.)

Any other condition may be used in this construction rather than isDone, as long as you are certain that the predicate being waited for will eventually become true due to the actions of a subtask (or one of its subtasks, and so on). For example, rather than relying on join, task control can rely on counters that keep track of task creation and completion. A counter can be incremented on each fork and decremented when the forked task has produced a result. This and related counter-based schemes can be attractive choices when subtasks communicate back results via callbacks rather than via access to result fields. Counters of this form are small-scale, localized versions of the barriers discussed in 4.4.3.

Callback-based fork/join designs are seen, for example, in problem-solving algorithms, games, searching, and logic programming. In many such applications, the number of subtasks that are forked can vary dynamically, and subtask results are better captured by method calls than by field extraction.

Callback-based approaches also permit greater asynchrony than techniques such as the linked tasks in 4.4.1.5. This can lead to better performance when subtasks differ in expected duration, since the result processing of quickly completing subtasks can sometimes overlap with continued processing of longer ones. However, this design gives up all result ordering guarantees, and thus is applicable only when subtask result processing is completely independent of the order in which results are produced.

Callback counters are used in the following class FibVCB, which is not at all well-suited for the problem at hand but serves to exemplify techniques. This code illustrates a typical but delicate combination of task-local variables, volatiles, and locking in an effort to keep task control overhead to a minimum:

class FibVCB extends FJTask {
 // ...
 volatile int number = 0;    // as before
 final FibVCB parent;      // is null for outermost call
 int callbacksExpected = 0; 
 volatile int callbacksReceived = 0;

 FibVCB(int n, FibVCB p) { number = n; parent = p; }

 // Callback method invoked by subtasks upon completion
 synchronized void addToResult(int n) { 
  number += n;
  ++callbacksReceived;
 }

 public void run() { // same structure as join-based version
  int n = number;
  if (n <= sequentialThreshold) 
   number = seqFib(n);
  else {
   // Clear number so subtasks can fill in
   number = 0;
   // Establish number of callbacks expected
   callbacksExpected = 2;

   new FibVCB(n - 1, this).fork();
   new FibVCB(n - 2, this).fork();

   // Wait for callbacks from children
   while (callbacksReceived < callbacksExpected) yield(); 
  }

  // Call back parent
  if (parent != null) parent.addToResult(number); 
 }
}

Notes:

  • All mutual exclusion locking is restricted to small code segments protecting field accesses, as must be true for any class in a lightweight task framework. Tasks are not allowed to block unless they are sure they will be able to continue soon. In particular, this framework unenforceably requires that synchronized blocks not span forks and subsequent joins or yields.

  • To help eliminate some synchronization, the callback count is split into two counters, callbacksExpected and callbacksReceived. The task is done when they are equal.

  • The callbacksExpected counter is used only by the current task, so access need not be synchronized, and it need not be volatile. In fact, since exactly two callbacks are always expected in the recursive case and the value is never needed outside the run method, this class could easily be reworked in a way that eliminates all need for this variable. However, such a variable is needed in more typical callback-based designs where the number of forks may vary dynamically and may be generated across multiple methods.

  • The addToResult callback method must be synchronized to avoid interference problems when subtasks call back at about the same time.

  • So long as both number and callbacksReceived are declared as volatile, and callbacksReceived is updated as the last statement of addToResult, the yield loop test need not involve synchronization because it is waiting for a latching threshold that, once reached, will never change (see 3.4.2.1).

  • We could also define a reworked getAnswer method that uses these mechanics so that it returns an answer if all callbacks have been received. However, since this method is designed to be called by external (non-task) clients upon completion of the overall computation, there is no compelling reason to do this. The version from the original Fib class suffices.

  • Despite these measures, the overhead associated with task control in this version is greater than that of the original version using coInvoke. If you were to use it anyway, you would probably choose a slightly larger sequential threshold, and thus exploit slightly less parallelism.

4.4.1.7 Cancellation

In some designs, there is no need for keeping counts of callbacks or exhaustively traversing through subtask lists. Instead, tasks complete when any subtask (or one of its subtasks, and so on) arrives at a suitable result. In these cases, you can avoid wasting computation by cancelling any subtasks in the midst of producing results that will not be needed.

The options here are similar to those seen in other situations involving cancellation (see 3.1.2). For example, subtasks can regularly invoke a method (perhaps isDone) in their parents that indicates that an answer has already been found, and if so to return early. They must also set their own status, so any of their subtasks can do the same. This can be implemented here using FJTask.cancel that just prematurely sets isDone status. This suppresses execution of tasks that have not yet been started, but has no effect on tasks in the midst of execution unless the tasks' run methods themselves detect updated status and deal with it.

When an entire set of tasks are all trying to compute a single result, an even simpler strategy suffices: Tasks may regularly check a global (static) variable that indicates completion. However, when there are many tasks, and many CPUs, more localized strategies may still be preferable to one that places so much pressure on the underlying system by generating many accesses to the same memory location, especially if it must be accessed under synchronization. Additionally, bear in mind that the total overhead associated with cancellation should be less than the cost of just letting small tasks run even if their results are not needed.

For example, here is a class that solves the classic N-Queens problem, searching for the placement of N queens that do not attack each other on a chessboard of size NxN. For simplicity of illustration, it relies on a static Result variable. Here tasks check for cancellation only upon entry into the method. They will continue looping through possible extensions even if a result has already been found. However, the generated tasks will immediately exit. This can be slightly wasteful, but may obtain a solution more quickly than a version that checks for completion upon every iteration of every task.

Note also here that the tasks do not bother joining their subtasks since there is no reason to do so. Only the ultimate external caller (in main) needs to wait for a solution; this is supported here by adding standard waiting and notification methods to the Result class. (Also, for compactness, this version does not employ any kind of granularity threshold. It is easy to add one, for example by directly exploring moves rather than forking subtasks when the number of rows is close to the board size.)

class NQueens extends FJTask {
 static int boardSize; // fixed after initialization in main
 // Boards are arrays where each cell represents a row,
 // and holds the column number of the queen in that row

 static class Result {     // holder for ultimate result
  private int[ ] board = null; // non-null when solved

  synchronized boolean solved() { return board != null; }

  synchronized void set(int[ ] b) { // Support use by non-Tasks
   if (board == null) { board = b; notifyAll(); }
  }

  synchronized int[ ] await() throws InterruptedException {
   while (board == null) wait();
   return board;
  }
 }
 static final Result result = new Result();
 public static void main(String[ ] args) {
  boardSize = ...;
  FJTaskRunnerGroup tasks = new FJTaskRunnerGroup(...);
  int[ ] initialBoard = new int[0]; // start with empty board
  tasks.execute(new NQueens(initialBoard)); 
  int[ ] board = result.await();
  // ...
 }

 final int[ ] sofar;      // initial configuration 

 NQueens(int[ ] board) { this.sofar = board; }

 public void run() {
  if (!result.solved()) {   // skip if already solved
   int row = sofar.length;

   if (row >= boardSize)   // done
    result.set(sofar);

   else {          // try all expansions

    for (int q = 0; q < boardSize; ++q) {
     
     // Check if queen can be placed in column q of next row
     boolean attacked = false;
     for (int i = 0; i < row; ++i) {
      int p = sofar[i];
      if (q == p || q == p - (row-i) || q == p + (row-i)) {
       attacked = true;
       break;
      }
     }
     
     // If so, fork to explore moves from new configuration
     if (!attacked) { 
      // build extended board representation
      int[ ] next = new int[row+1];
      for (int k = 0; k < row; ++k) next[k] = sofar[k];
      next[row] = q;
      new NQueens(next).fork();
     }
    }
   }
  }
 }
}

4.4.2 Computation Trees

A number of computationally intensive algorithms involve tasks of the form:

For a fixed number of steps, or until convergence, do { 
 Update one section of a problem; 
 Wait for other tasks to finish updating their sections; 
}

Figure 4-40

Most often, such algorithms perform update operations on partitioned arrays, matrices, or image representations. For example, many physical dynamics problems involve repeated local updates to the cells of a matrix. Jacobi algorithms and related relaxation techniques repeatedly recalculate estimated values across neighboring cells, typically using an averaging formula such as:

void oneStep(double[ ][ ] oldMatrix, 
       double[ ][ ] newMatrix, int i, int j) {
  newMatrix[i][j] = 0.25 * (oldMatrix[i-1][j] + 
               oldMatrix[i][j-1] +
               oldMatrix[i+1][j] + 
               oldMatrix[i][j+1]);
 }

Normally, to save space, two different matrices are swapped as newMatrix and oldMatrix across successive steps.

Algorithms requiring that all tasks periodically wait for all others to complete do not always scale quite as well as more loosely coupled fork/join designs. Even so, these algorithms are common, efficient, and amenable to significant parallel speedups.

4.4.2.1 Building and using trees

It would be inefficient to repeatedly apply fork/join decomposition in iterative designs in order to update sections in parallel. Because the sections are the same across iterations, they can be constructed just once and then repeatedly invoked so that on each iteration, the corresponding updates execute in the same order as would be produced by a recursive solution.

Computation trees are explicit representations of the tree-structured computations implicitly arising in fork/join recursion. These trees have two kinds of nodes, internal nodes and leaf nodes, corresponding to the recursive and base cases of a recursive solution. They can be constructed and used for iterative update problems via the following steps:

  1. Create a tree of task objects representing the recursive partitions, where:

    • Each internal node contains references to subpartitions, and has an update method that performs fork/join processing of each of them.

    • Each leaf node represents a finest-granularity partition, and has an update method that operates directly on it.

  2. For a fixed number of steps, or until convergence, do:

    • Execute the task performing the root partition's update method.

For example, the following code illustrates the highlights of a set of classes that perform Jacobi iteration using the averaging formula shown above. In addition to updating, this version also keeps track of the differences among computed cell values across iterations, and stops when the maximum difference is within a constant EPSILON. Also, like many programs of this form, this code assumes that the matrices have been set up with extra edge cells that are never updated, so boundary conditions never need to be checked. (Alternatives include recomputing edge values using special edge formulas after each pass, and treating edges as toroidally wrapping around the mesh.)

The recursive decomposition strategy used here is to divide the mesh into quadrants, stopping when the number of cells is at most leafCells, which serves as the granularity threshold. This strategy works well so long as the numbers of rows and columns in the matrix are approximately equal. If they are not, additional classes and methods could be defined to divide across only one dimension at a time. The approach here assumes that the matrix as a whole already exists, so rather than actually dividing up cells, task nodes just keep track of the row and column offsets of this matrix that each partition is working on.

The subclass-based design used here reflects the different structure and behavior of internal versus leaf nodes. Both are subclasses of abstract base JTree:

abstract class JTree extends FJTask {
 volatile double maxDiff; // for convergence check
}

class Interior extends JTree {
 private final JTree[ ] quads;

 Interior(JTree q1, JTree q2, JTree q3, JTree q4) {
  quads = new JTree[ ] { q1, q2, q3, q4 };
 }

 public void run() { 
  coInvoke(quads); 
  double md = 0.0;
  for (int i = 0; i < quads.length; ++i) {
   md = Math.max(md,quads[i].maxDiff);
   quads[i].reset(); 
  }
  maxDiff = md;
 }
}

class Leaf extends JTree {
 private final double[ ][ ] A; private final double[ ][ ] B; 
 private final int loRow;  private final int hiRow;
 private final int loCol;  private final int hiCol; 
 private int steps = 0; 

 Leaf(double[ ][ ] A, double[ ][ ] B, 
    int loRow, int hiRow, int loCol, int hiCol) {
  this.A = A;  this.B = B;
  this.loRow = loRow; this.hiRow = hiRow;
  this.loCol = loCol; this.hiCol = hiCol;
 }

 public synchronized void run() {
  boolean AtoB = (steps++ % 2) == 0;
  double[ ][ ] a = (AtoB)? A : B;
  double[ ][ ] b = (AtoB)? B : A;
  double md = 0.0;
  for (int i = loRow; i <= hiRow; ++i) {
   for (int j = loCol; j <= hiCol; ++j) {
    b[i][j] = 0.25 * (a[i-1][j] + a[i][j-1] +
             a[i+1][j] + a[i][j+1]);
    md = Math.max(md, Math.abs(b[i][j] - a[i][j]));
   }
  }
  maxDiff = md;
 }
}

The driver class first builds a tree that represents the partitioning of its argument matrix. The build method could itself be parallelized. But because the base actions are just node constructions, the granularity threshold would be so high that parallelization would be worthwhile only for huge problem sizes.

The run method repeatedly sets the root task in motion and waits out completion. For simplicity of illustration, it continues until convergence. Among other changes necessary to turn this into a realistic program, you would need to initialize the matrices and deal with possible lack of convergence within a bounded number of iterations. Because each iteration entails a full synchronization point waiting for the root task to finish, it is relatively simple to insert additional operations that maintain or report global status between iterations.

class Jacobi extends FJTask {
 static final double EPSILON = 0.001; // convergence criterion
 final JTree root; 
 final int maxSteps;
 Jacobi(double[ ][ ] A, double[][] B, 
     int firstRow, int lastRow, int firstCol, int lastCol,
     int maxSteps, int leafCells) {
  this.maxSteps = maxSteps;
  root = build(A, B, firstRow, lastRow, firstCol, lastCol,
         leafCells);
 }

 public void run() {
  for (int i = 0; i < maxSteps; ++i) {
   invoke(root); 
   if (root.maxDiff < EPSILON) { 
    System.out.println("Converged");
    return;
   }
   else root.reset();
  }
 }

 static JTree build(double[ ][ ] a, double[ ][ ] b,
           int lr, int hr, int lc, int hc, int size) {
  if ((hr - lr + 1) * (hc - lc + 1) <= size) 
   return new Leaf(a, b, lr, hr, lc, hc);
  int mr = (lr + hr) / 2; // midpoints
  int mc = (lc + hc) / 2;
  return new Interior(build(a, b, lr,  mr, lc,  mc, size),
            build(a, b, lr,  mr, mc+1, hc, size),
            build(a, b, mr+1, hr, lc,  mc, size),
            build(a, b, mr+1, hr, mc+1, hc, size));
 }
}

4.4.3 Barriers

Recursive decomposition is a powerful and flexible technique, but does not always fit well with the structure of iterative problems, and usually requires adoption of a lightweight execution framework for efficient implementation. A more direct path to a solution of many iterative problems is first to divide the problem into segments, each with an associated task performing a loop that must periodically wait for other segments to complete. From the perspective of tree-based approaches, these designs flatten out all the internal nodes and just deal with the leaves.

Figure 4-41

As with recursive tasks, there are opportunities to specialize Threads to make them more attuned to the demands of parallel iteration (see Further Readings). However, there is usually less to be gained by doing so, in part because all thread construction overhead is restricted to the start-up phase. Here we illustrate the basic mechanics using regular Threads each executing a single Runnable. When using Threads, granularity thresholds must in general be substantially higher than when using lightweight executable classes (although still substantially lower than those needed in distributed parallel designs). But the basic logic of iterative algorithms is otherwise identical, regardless of granularity. In many iterative problems, little potential parallelism is wasted by using coarse granularities. When all threads perform approximately the same actions for approximately the same durations, creating only as many tasks as CPUs, or perhaps a small multiple of the number of CPUs, can work well.

While it is always possible to hand-craft the necessary control mechanics using waiting and notification constructs, it is both more convenient and less error-prone instead to rely on standardized synchronization aids that encapsulate these mechanics. The synchronization device of choice in iterative designs is a cyclic barrier. A cyclic barrier is initialized with a fixed number of parties that will be repeatedly synchronizing. It supports only one method, barrier, that forces each caller to wait until all parties have invoked the method, and then resets for the next iteration. A basic CyclicBarrier class can be defined as follows:

class CyclicBarrier {

 protected final int parties;
 protected int count;   // parties currently being waited for
 protected int resets = 0; // times barrier has been tripped

 CyclicBarrier(int c) { count = parties = c; }

 synchronized int barrier() throws InterruptedException { 
  int index = --count;
  if (index > 0) {    // not yet tripped
   int r = resets;    // wait until next reset

   do { wait(); } while (resets == r);

  }
  else {         // trip 
   count = parties;   // reset count for next time
   ++resets;
   notifyAll();     // cause all other parties to resume
  }

  return index;
 }
}

(The util.concurrent version of this class available from the online supplement deals more responsibly with interruptions and time-outs. Fancier versions that reduce memory contention on the lock and on the fields may be worth constructing on systems with very large numbers of processors.)

The CyclicBarrier.barrier method defined here returns the number of other threads that were still waiting when the barrier was entered, which can be useful in some algorithms. As another by-product, the barrier method is intrinsically synchronized, so it also serves as a memory barrier to ensure flushes and loads of array element values in its most typical usage contexts (see 2.2.7).

A barrier may also be construed as a simple consensus operator (see 3.6). It gathers "votes" among several threads about whether they should all continue to the next iteration. Release occurs when all votes have been collected and agreement has thus been reached. (However, unlike transaction frameworks, threads using this CyclicBarrier class are not allowed to vote "no".)

With barriers, many parallel iterative algorithms become easy to express. In the simplest cases, these programs might take the form (eliding all problem-specific details):

class Segment implements Runnable {      // Code sketch
 final CyclicBarrier bar; // shared by all segments
 Segment(CyclicBarrier b, ...) { bar = b; ...; }

 void update() { ... } 

 public void run() {
  // ...
  for (int i = 0; i < iterations; ++i) {
   update();
   bar.barrier();
  }
  // ...
 }
}

class Driver {  
 // ...
 void compute(Problem problem) throws ... {
  int n = problem.size / granularity;
  CyclicBarrier barrier = new CyclicBarrier(n);
  Thread[ ] threads = new Thread[n];

  // create
  for (int i = 0; i < n; ++i) 
   threads[i] = new Thread(new Segment(barrier, ...));

  // trigger
  for (int i = 0; i < n; ++i) threads[i].start();

  // await termination
  for (int i = 0; i < n; ++i) threads[i].join();
 }
}

This structure suffices for problems requiring known numbers of iterations. However, many problems require checks for convergence or some other global property between iterations. (Conversely, in a few chaotic relaxation algorithms you don't even need a barrier after each iteration, but can instead let segments free-run for a while between barriers and/or checks.)

One way to provide convergence checks is to rework the CyclicBarrier class to optionally run a supplied Runnable command whenever a barrier is about to be reset. A more classic approach, which illustrates a technique useful in other contexts as well, is to rely on the index returned by barrier. The caller obtaining index zero (as an arbitrary, but always legal choice) can perform the check while all others are quietly waiting for a second barrier.

For example, here a a barrier-based version of a segment class for the Jacobi problem described in 4.4.2. Collections of JacobiSegment objects can be initialized and run by a driver of the generic form given above.

class JacobiSegment implements Runnable {    // Incomplete
 // These are same as in Leaf class version:
 double[ ][ ] A;    double[ ][ ] B; 
 final int firstRow; final int lastRow;
 final int firstCol; final int lastCol;
 volatile double maxDiff; 
 int steps = 0;
 void update() { /* Nearly same as Leaf.run */ }

 final CyclicBarrier bar;
 final JacobiSegment[ ] allSegments; // for convergence check
 volatile boolean converged = false;

 JacobiSegment(double[ ][ ] A, double[ ][ ] B, 
        int firstRow, int lastRow, 
        int firstCol, int lastCol,
        CyclicBarrier b, JacobiSegment[ ] allSegments) {
  this.A = A;  this.B = B;
  this.firstRow = firstRow; this.lastRow = lastRow;
  this.firstCol = firstCol; this.lastCol = lastCol;
  this.bar = b;
  this.allSegments = allSegments;
 }

 public void run() {
  try {
   while (!converged) {
    update();
    int myIndex = bar.barrier(); // wait for all to update
    if (myIndex == 0) convergenceCheck();
    bar.barrier();       // wait for convergence check
   }
  }
  catch(Exception ex) { 
   // clean up ...
  }
 }

 void convergenceCheck() {
  for (int i = 0; i < allSegments.length; ++i) 
   if (allSegments[i].maxDiff > EPSILON) return;
  for (int i = 0; i < allSegments.length; ++i) 
   allSegments[i].converged = true;
 }
}

4.4.4 Further Readings

For a survey of approaches to high-performance parallel processing, see

    Skillicorn, David, and Domenico Talia, "Models and Languages for Parallel Computation", Computing Surveys, June 1998.

Most texts on parallel programming concentrate on algorithms designed for use on fine-grained parallel machine architectures, but also cover design techniques and algorithms that can be implemented using the kinds of stock multiprocessors most amenable to supporting a JVM. See, for example:

    Foster, Ian. Designing and Building Parallel Programs, Addison Wesley, 1995.

    Roosta, Seyed. Parallel Processing and Parallel Algorithms, Springer-Verlag, 1999.

    Wilson, Gregory. Practical Parallel Programming, MIT Press, 1995.

    Zomaya, Albert (ed.). Parallel and Distributed Computing Handbook, McGraw-Hill, 1996.

Pattern-based accounts of parallel programming include:

    Massingill, Berna, Timothy Mattson, and Beverly Sanders. A Pattern Language for Parallel Application Programming, Technical report, University of Florida, 1999.

    MacDonald, Steve, Duane Szafron, and Jonathan Schaeffer. "Object-Oriented Pattern-Based Parallel Programming with Automatically Generated Frameworks", in Proceedings of the 5th USENIX Conference on Object-Oriented Tools and Systems (COOTS), 1999.

The FJTask framework internally relies on a work-stealing task scheduler based on the one in Cilk, a C-based parallel programming framework. In work-stealing schedulers, each worker thread normally runs (in LIFO order) tasks that it constructs, but when idle steals (in FIFO order) those constructed by other worker threads. More details, including explanations of the senses in which this scheme is optimal for recursive fork/join programs, may be found in:

    Frigo, Matteo, Charles Leiserson, and Keith Randall. "The Implementation of the Cilk-5 Multithreaded Language", Proceedings of 998 ACM SIGPLAN Conference on Programming Language Design and Implementation (PLDI), 1998.

The online supplement includes more realistic examples of the techniques discussed in this section. It also provides links to the Cilk package and related frameworks, including Hood (a C++ follow-on to Cilk) and Filaments (a C package that includes a specialized framework supporting barrier-based iterative computation).

  • + Share This
  • 🔖 Save To Your Account

Related Resources

There are currently no related titles. Please check back later.