Home > Articles > Programming > Java

  • Print
  • + Share This

4.2 Composing Oneway Messages

Many interprocess and distributed designs involve groups of objects exchanging oneway messages (see 1.2 and 4.5). Similar techniques may be applied within individual concurrent programs. In fact, as discussed in 4.1, a larger range of design options is available in concurrent programs than in distributed systems. Messages need not be restricted to, say, socket-based commands. Concurrent programs may also employ lighter alternatives including direct invocations and event-based communication.

However, this wide range of options also introduces opportunities for creating chaotic, difficult-to-understand designs. This section describes some simple program-level (or subsystem-level) structuring techniques that tend to produce well-behaved, readily understandable, and readily extensible designs.

A flow network is a collection of objects that all pass oneway messages transferring information and/or objects to each other along paths from sources to sinks. Flow patterns may occur in any kind of system or subsystem supporting one or more series of connected steps or stages, in which each stage plays the role of a producer and/or consumer. Broad categories include:

    Control systems. External sensor inputs ultimately cause control systems to generate particular effector outputs. Applications such as avionics control systems contain dozens of kinds of inputs and outputs. For a plainer example, consider a skeletal thermostatic heater control:

Figure 4-9

    Assembly systems. Newly created objects undergo a series of changes and/or become integrated with other new objects before finally being used for some purpose; for example, an assembly line for Cartons:

Figure 4-10

    Dataflow systems. Each stage transforms or otherwise processes data. For example, in pipelined multimedia systems, audio and/or video data is processed across multiple stages. In publish-subscribe systems, possibly many data sources send information to possibly many consumers. In Unix pipes-and-filters shell programs, stages send character data, as in a simple spell checker:

Figure 4-11

    Workflow systems. Each stage represents an action that needs to be performed according to some set of business policies or other requirements; for example, a simple payment system:

Figure 4-12

    Event systems. Stages pass around and ultimately execute code associated with objects representing messages, user inputs, or simulated physical phenomena. The beginnings of many event systems take the form:

Figure 4-13

4.2.1 Composition

The development of flow networks entails two main sets of concerns: design of the data being passed around, and design of the stages that do the passing. Representations

Flow networks pass around representational components — families of values or objects representing the things the flow is all about. In the introductory examples, temperatures, cardboard sheets, words, invoices, and events are the basic kinds of values and objects passed across connected stages. Often these components are interesting objects in their own rights that can perform services, communicate with other objects, and so on. But when viewed as the raw material for a flow, they are treated as mere passive representations, providing data or information rather than behavior.

While they play similar roles in the overall design of a flow system, different categories of representation types affect the details of the rest of the system:

  • Information types representing the state of the world (for example values such as temperature readings, maintained as scalars or immutable ADT objects) differ from most others in that it is often acceptable to reuse old or current best-estimate values if necessary. In essence, producers have an inexhaustible supply of such values.

  • Event indicators normally can be used at most once, although they may be passed around many times before being used.

  • Mutable resource types (such as cartons) may be transferred (see 2.3.4) from each stage to the next, ensuring that each object is being operated upon by at most one stage at any given time.

  • Alternatively, if the identities of mutable representation objects do not matter, they can be copied across stages as needed. Copy-based approaches are more often used in distributed flow networks in which ownership cannot be transferred across stages simply by assigning reference fields.

  • Artificial data types can be used for control purposes. For example, a special null token may be used as a terminator that triggers cancellation and shutdown. Similarly, a special keepalive can be sent to inform one stage that another still exists. Alternatively, a distinct set of sideband control methods can be employed across stages. Sideband controls are methods used to set stages into different modes that influence their main processing. For example, a thermostat Comparator may have a separate control to change its threshold. Stages

Stages in well-behaved flow networks all obey sets of constraints that are reminiscent of those seen in electrical circuit design. Here is one conservative set of composition rules that generate a small number of basic kinds of stages:

    Directionality. Flow maintains a single directionality, from sources to sinks. There are no loops or back-branches from consumers to producers. This results in a directed acyclic graph (DAG) of information or object flow.

    Interoperability. Methods and message formats are standardized across components, normally through conformance to a small set of interfaces.

    Connectivity. Stages maintain fixed connectivity: consumers may receive messages only from known producers, and vice versa. So, for example, while a web service may have any number of anonymous clients, a given TemperatureComparator object may be designed to receive temperature update messages only from a designated TemperatureSensor object.

    Connectivity is usually arranged by maintaining direct references from producers to consumers or vice versa, or by having them share access to a Channel. Alternatively, a network may be based on constrained use of blackboards, multicast channels, or JavaSpaces (see 4.1.6) in which producers specially tag messages destined for particular consumers.

    Transfer protocols. Every message transfers information or objects. Once a stage has transferred a mutable object, it never again manipulates that object. When necessary, special buffer stages may be interposed to hold elements transferred out from one stage that cannot yet be accepted by other stages.

    Transfer protocols typically rely on the basic put and take operations described in 2.3.4. When all messages involve put-based transfers, networks are normally labeled as push flow; when they involve take-based transfers, they are normally labeled as pull flow; when they involve channels supporting both put and take (and possibly exchange), they can take various mixed forms.

    Threads. Stages may implement oneway message passing using any of the patterns described in 4.1, as long as every (potentially) simultaneously live connection from a given producer to a given consumer employs a different thread or thread-based message-sending construction.

    It is rarely necessary to satisfy this requirement by issuing every message, or every stream of messages from a producer to a consumer, in a different thread. You can instead exploit connectivity rules to use threads only as needed. Most sources in push-based systems intrinsically employ threads. Additionally, any push stage with multiple successors that may ultimately hit a Combiner stage must issue the messages independently. Otherwise, if a thread is blocked at the combine point, there may be a possibility that the Combiner will never see the other inputs necessary to unblock it.

    Conversely, most sinks in pull-based systems intrinsically employ thread-based message constructions, as do stages involved in split/join connections proceeding from the opposite direction pictured above.

Sources have no predecessors.


Sinks have no successors.


Linear stages have at most one predecessor and one successor.


Routers send a message to one of their successors.


Multicasters send messages to all their successors.


Collectors accept messages from one of their predecessors at a time.


Combiners require messages from all their predecessors.


Figure 4-14

These rules can be liberalized in various ways. In fact, you can adopt any set of composition rules you like. But the listed constraints serve to eliminate large classes of safety and liveness problems while also satisfying common reusability and performance goals: unidirectional flow avoids deadlock, connectivity management avoids unwanted interleavings across different flows, transfer protocols avoid safety problems due to inadvertent sharing without the need for extensive dynamic synchronization, and interface conformance assures type safety while still permitting interoperability among components. Scripting

Adoption of standard set of composition rules makes possible the construction of higher-level tools that arrange for stages to operate cooperatively, without otherwise imposing centralized dynamic synchronization control. Composition of flow networks can be treated as a form of scripting in the usual sense of the word — semi-automated programming of the code that glues together instances of existing object types. This is the kind of programming associated with languages such as JavaScript, Visual Basic, Unix shells, and FlowMark (a workflow tool). Development of a scripting tool, or integration with an existing one, is an optional step in building systems based around flows.

This architecture is analogous to that of GUI builders consisting of a base set of widgets, packers and layout managers, code to instantiate a particular GUI, and a visual scripter that helps set it all up. Alternatively, it may be possible to script flows through direct manipulation tools by which, for example, components communicate instantly once dragged-and-dropped to connect with others.

4.2.2 Assembly Line

The remainder of this section illustrates the design and implementation of flow systems via an example assembly line applet that builds series of "paintings" in a style vaguely reminiscent of the artists Piet Mondrian and Mark Rothko. Only the principal classes are given here. Some include unimplemented method declarations. The full code may be found in the online supplement, which also includes other application-level examples of flow-based systems.

Figure 4-15 Representations

To start out, we need some base representation types. In this system, all elements can be defined as subclasses of abstract class Box, where every Box has a color and a size, can display itself when asked, and can be made to deeply clone (duplicate) itself. The color mechanics are default-implemented. Others are left abstract, to be defined differently in different subclasses:

abstract class Box {
 protected Color color = Color.white; 

 public synchronized Color getColor()    { return color; }
 public synchronized void setColor(Color c) { color = c; }
 public abstract java.awt.Dimension size();
 public abstract Box duplicate();         // clone
 public abstract void show(Graphics g, Point origin);// display

The overall theme of this example is to start off with sources that produce simple basic boxes, and then push them through stages that paint, join, flip, and embed them to form the paintings. BasicBoxes are the raw material:

class BasicBox extends Box {
 protected final Dimension size; 

 public BasicBox(int xdim, int ydim) {
  size = new Dimension(xdim, ydim);

 public synchronized Dimension size() { return size; }

 public void show(Graphics g, Point origin) {
  g.fillRect(origin.x, origin.y, size.width, size.height);

 public synchronized Box duplicate() {
  Box p = new BasicBox(size.width, size.height);
  return p;

Two fancier kinds of boxes can be made by joining two existing boxes side by side and adding a line-based border surrounding them. Joined boxes can also flip themselves. All this can be done either horizontally or vertically. The two resulting classes can be made subclasses of JoinedPair to allow sharing of some common code:

abstract class JoinedPair extends Box {
 protected Box fst; // one of the boxes
 protected Box snd; // the other one

 protected JoinedPair(Box a, Box b) {
  fst = a; 
  snd = b;

 public synchronized void flip() { // swap fst/snd
  Box tmp = fst; fst = snd; snd = tmp;

 // other internal helper methods

class HorizontallyJoinedPair extends JoinedPair {

 public HorizontallyJoinedPair(Box l, Box r) {
  super(l, r);

 public synchronized Box duplicate() { 
  HorizontallyJoinedPair p =
   new HorizontallyJoinedPair(fst.duplicate(),
  return p;

 // ... other implementations of abstract Box methods

class VerticallyJoinedPair extends JoinedPair {
 // similar

The final kind of fancy box wraps one Box within a border:

class WrappedBox extends Box {
 protected Dimension wrapperSize;
 protected Box inner;

 public WrappedBox(Box innerBox, Dimension size) {
  inner = innerBox; 
  wrapperSize = size;

 // ... other implementations of abstract Box methods
} Interfaces

Looking ahead to how we might want to string stages together, it is worthwhile to standardize interfaces. We'd like to be able to connect any stage to any other stage for which it could make sense, so we want bland, noncommittal names for the principal methods.

Since we are doing oneway push-based flow, these interfaces mainly describe put-style methods. In fact, we could just call them all put, except that this doesn't work very well for two-input stages. For example, a VerticalJoiner needs two put methods, one supplying the top Box and one the bottom Box. We could avoid this by designing Joiners to take alternate inputs as the tops and bottoms, but this would make them harder to control. Instead, we'll use the somewhat ugly but easily extensible names putA, putB, and so on:

Figure 4-16

interface PushSource {
 void produce();

Figure 4-17

interface PushStage {
 void putA(Box p);

Figure 4-18

interface DualInputPushStage extends PushStage {
 void putB(Box p);
} Adapters

We can make the "B" channels of DualInputPushStages completely transparent to other stages by defining a simple Adapter class that accepts a putA but relays it to the intended recipient's putB. In this way, most stages can be built to invoke putA without knowing or caring that the box is being fed into some successor's B channel:

Figure 4-19

class DualInputAdapter implements PushStage {
 protected final DualInputPushStage stage;

 public DualInputAdapter(DualInputPushStage s) { stage = s; }

 public void putA(Box p) { stage.putB(p); }

} Sinks

Sinks have no successors. The simplest kind of sink doesn't even process its input, and thus serves as a way to throw away elements. In the spirit of Unix pipes and filters, we can call it:

Figure 4-20

class DevNull implements PushStage {
 public void putA(Box p) { }

More interesting sinks require more interesting code. For example, in the applet used to produce the image shown at the beginning of this section, the Applet subclass itself was defined to implement PushStage. It served as the ultimate sink by displaying the assembled objects. Connections

Interfaces standardize on the method names for stages but do nothing about the linkages to successors, which must be maintained using some kind of instance variables in each stage object. Except for sinks such as DevNull, each stage has at least one successor. There are several implementation options, including:

  • Have each object maintain a collection object holding all its successors.

  • Use a master connection registry that each stage interacts with to find out its successor(s).

  • Create the minimal representation: define a base class for stages with exactly one successor and one for those with exactly two successors.

The third option is simplest and works fine here. (In fact, it is always a valid option. Stages with three or more outputs can be built by cascading those for only two. Of course, you wouldn't want to do this if most stages had large and/or variable numbers of successors.)

This leads to base classes that support either one or two links and have one or two corresponding attachment methods, named using a similar ugly suffix convention (attach1, attach2). Because connections are dynamically assignable, they are accessed only under synchronization:

Figure 4-21

class SingleOutputPushStage {
 private PushStage next1 = null;
 protected synchronized PushStage next1() { return next1; }
 public synchronized void attach1(PushStage s) { next1 = s; }

Figure 4-22

class DualOutputPushStage extends SingleOutputPushStage {
 private PushStage next2 = null;
 protected synchronized PushStage next2() { return next2; }
 public synchronized void attach2(PushStage s) { next2 = s; }
} Linear stages

Now we can build all sorts of classes that extend either of the base classes, simultaneously implementing any of the standard interfaces. The simplest transformational stages are linear, single-input/single-output stages. Painters, Wrappers, and Flippers are merely:

Figure 4-23

class Painter extends SingleOutputPushStage
       implements PushStage {
 protected final Color color; // the color to paint boxes

 public Painter(Color c) { color = c; }

 public void putA(Box p) {

Figure 4-24

class Wrapper extends SingleOutputPushStage
       implements PushStage {
 protected final int thickness;

 public Wrapper(int t) { thickness = t; }

 public void putA(Box p) {
  Dimension d = new Dimension(thickness, thickness);
  next1().putA(new WrappedBox(p, d));

Figure 4-25

class Flipper extends SingleOutputPushStage
       implements PushStage {
 public void putA(Box p) {
  if (p instanceof JoinedPair) 
   ((JoinedPair) p).flip();


Painter and Wrapper stages apply to any kind of Box. But Flippers only make sense for JoinedPairs: if a Flipper receives something other than a JoinedPair, it just passes it through. In a more "strongly typed" version, we might instead choose to drop boxes other than JoinedPairs, perhaps by sending them to DevNull. Combiners

We have two kinds of Combiners, horizontal and vertical Joiners. Like the representation classes, these classes have enough in common to factor out a superclass. Joiner stages block further inputs until they can combine one item each from putA and putB. This can be implemented via guard mechanics that hold up acceptance of additional items from putA until existing ones have been paired up with those from putB, and vice versa:

Figure 4-26

abstract class Joiner extends SingleOutputPushStage
           implements DualInputPushStage {
 protected Box a = null; // incoming from putA
 protected Box b = null; // incoming from putB

 protected abstract Box join(Box p, Box q); 

 protected synchronized Box joinFromA(Box p) {
  while (a != null)       // wait until last consumed
   try { wait(); } 
   catch (InterruptedException e) { return null; }
  a = p;
  return tryJoin();
 protected synchronized Box joinFromB(Box p) { // symmetrical
  while (b != null) 
   try { wait(); } 
   catch (InterruptedException ie) { return null; }
  b = p;
  return tryJoin();

 protected synchronized Box tryJoin() {
  if (a == null || b == null) return null; // cannot join
  Box joined = join(a, b);       // make combined box
  a = b = null;             // forget old boxes
  notifyAll();             // allow new puts
  return joined;

 public void putA(Box p) {
  Box j = joinFromA(p);
  if (j != null) next1().putA(j);

 public void putB(Box p) {
  Box j = joinFromB(p);
  if (j != null) next1().putA(j);
class HorizontalJoiner extends Joiner {
 protected Box join(Box p, Box q) {
  return new HorizontallyJoinedPair(p, q);
class VerticalJoiner extends Joiner {
 protected Box join(Box p, Box q) {
  return new VerticallyJoinedPair(p, q);
} Collectors

A Collector accepts messages on either channel and relays them to a single successor:

Figure 4-27

class Collector extends SingleOutputPushStage
        implements DualInputPushStage {
 public void putA(Box p) { next1().putA(p);}
 public void putB(Box p) { next1().putA(p); }

If for some reason we needed to impose a bottleneck here, we could define an alternative form of collector in which these methods are declared as synchronized. This could also be used to guarantee that at most one activity is progressing through a given collector at any given time. Dual output stages

Our multiple-output stages should generate threads or use one of the other options discussed in 4.1 to drive at least one of their outputs (it doesn't matter which). This maintains liveness when elements are ultimately passed to Combiner stages (here, the Joiners). For simplicity of illustration, the following classes create new Threads. Alternatively, we could set up a simple worker thread pool to process these messages.

Alternators output alternate inputs to alternate successors:

Figure 4-28

class Alternator extends DualOutputPushStage
            implements PushStage {
 protected boolean outTo2 = false; // control alternation

 protected synchronized boolean testAndInvert() {
  boolean b = outTo2; 
  outTo2 = !outTo2; 
  return b;

 public void putA(final Box p) {
  if (testAndInvert()) 
  else {
    new Thread(new Runnable() {
     public void run() { next2().putA(p); }

Cloners multicast the same element to both successors:

Figure 4-29

class Cloner extends DualOutputPushStage
          implements PushStage {

 public void putA(Box p) {
  final Box p2 = p.duplicate();
  new Thread(new Runnable() {
   public void run() { next2().putA(p2); }


A Screener is a stage that directs all inputs obeying some predicate to one channel, and all others to the other:

Figure 4-30

We can build a generic Screener by encapsulating the BoxPredicate to check in an interface and implementing it, for example, with a class that makes sure that a Box fits within a given (symmetric, in this case) bound. The Screener itself accepts a BoxPredicate and uses it to direct outputs:

interface BoxPredicate {
 boolean test(Box p);
class MaxSizePredicate implements BoxPredicate {

 protected final int max; // max size to let through

 public MaxSizePredicate(int maximum) { max = maximum; }

 public boolean test(Box p) {
  return p.size().height <= max && p.size().width <= max;
class Screener extends DualOutputPushStage
        implements PushStage {

 protected final BoxPredicate predicate;
 public Screener(BoxPredicate p) { predicate = p; }

 public void putA(final Box p) {
  if (predicate.test(p)) {
   new Thread(new Runnable() {
     public void run() { next1().putA(p); }
} Sources

Here is a sample source, one that produces BasicBoxes of random sizes. For convenience, it is also equipped with an autonomous loop run method repeatedly invoking produce, interspersed with random production delays:

Figure 4-31

class BasicBoxSource extends SingleOutputPushStage 
           implements PushSource, Runnable {

 protected final Dimension size;   // maximum sizes
 protected final int productionTime; // simulated delay

 public BasicBoxSource(Dimension s, int delay) { 
  size = s;   
  productionTime = delay; 

 protected Box makeBox() {
  return new BasicBox((int)(Math.random() * size.width) + 1,
            (int)(Math.random() * size.height) + 1);

 public void produce() { 

 public void run() { 
  try {
   for (;;) {
    Thread.sleep((int)(Math.random() * 2* productionTime));
  catch (InterruptedException ie) { } // die

} Coordination

Without a scripting tool based on these classes, we have to program assembly lines by manually creating instances of desired stages and linking them together. This is easy in principle, but tedious and error-prone in practice because of the lack of visual guidance about what stages are connected to what.

Here's a fragment of the flow used in the applet that produced the image displayed at the beginning of this section:

Figure 4-32

The code setting this up may be found in the online supplement. The main constructor mostly consists of many lines of the form:

Stage aStage = new Stage();

This is followed by invoking start on threads running all the sources.

4.2.3 Further Readings

Flow patterns often serve as the computational versions of use cases, scenarios, scripts, and related concepts from high-level object-oriented analysis. Most of the books on OO design and on design patterns listed in 1.3.5 and 1.4.5 describe issues relevant to the analysis, design and implementation of flow-based systems. Domain-specific issues surrounding packet networking, telecommunications, and multimedia systems often requiring more elaborate flow-based designs are discussed in the texts on concurrent and distributed systems in 1.2.5.

  • + Share This
  • 🔖 Save To Your Account

Related Resources

There are currently no related titles. Please check back later.