Home > Articles > Programming > Java

  • Print
  • + Share This
This chapter is from the book

Using Pipes for Communication Between Threads

Sometimes, the communication pattern between threads is very simple. A producer thread generates a stream of bytes. A consumer thread reads and processes that byte stream. If no bytes are available for reading, the consumer thread blocks. If the producer generates data much more quickly than the consumer can handle it, then the write operation of the producer thread blocks. The Java programming language has a convenient set of classes, PipedInputStream and PipedOutputStream, to implement this communication pattern. (There is another pair of classes, PipedReader and PipedWriter, if the producer thread generates a stream of Unicode characters instead of bytes.)

The principal reason to use pipes is to keep each thread simple. The producer thread simply sends its results to a stream and forgets about them. The consumer simply reads the data from a stream, without having to care where it comes from. By using pipes, you can connect multiple threads with each other without worrying about thread synchronization.

Example 1–13 is a program that shows off piped streams. We have a producer thread that emits random numbers at random times, a filter thread that reads the input numbers and continuously computes the average of the data, and a consumer thread that prints out the answers. (You'll need to use ctrl+c to stop this program.) Figure 1–18 shows the threads and the pipes that connect them. UNIX users will recognize these pipe streams as the equivalent of pipes connecting processes in UNIX.

Piped streams are only appropriate if the communication between the threads is on a low level, such as a sequence of numbers as in this example. In other situations, you can use queues. The producing thread inserts objects into the queue, and the consuming thread removes them.

Figure 1–18: A sequence of pipes

Example 1–13: PipeTest.java

	1.	import java.util.*;
	2.	import java.io.*;
	3.	
	4.	/**
	5.	  This program demonstrates how multiple threads communicate
	6.	  through pipes.
	7.	*/
	8.	public class PipeTest
	9.	{ 
	10.	  public static void main(String args[])
	11.	  { 
	12.	   try
	13.	   { 
	14.	     /* set up pipes */
	15.	     PipedOutputStream pout1 = new PipedOutputStream();
	16.	     PipedInputStream pin1 = new PipedInputStream(pout1);
	17.	
	18.	     PipedOutputStream pout2 = new PipedOutputStream();
	19.	     PipedInputStream pin2 = new PipedInputStream(pout2);
	20.	
	21.	     /* construct threads */
	22.	
	23.	     Producer prod = new Producer(pout1);
	24.	     Filter filt = new Filter(pin1, pout2);
	25.	     Consumer cons = new Consumer(pin2);
	26.	
	27.	     /* start threads */
	28.	
	29.	     prod.start();
	30.	     filt.start();
	31.	     cons.start();
	32.	   }
	33.	   catch (IOException e){}
	34.	  }
	35.	}
	36.	
	37.	/**
	38.	  A thread that writes random numbers to an output stream.
	39.	*/
	40.	class Producer extends Thread
	41.	{ 
	42.	  /**
	43.	   Constructs a producer thread.
	44.	   @param os the output stream
	45.	  */
	46.	  public Producer(OutputStream os)
	47.	  { 
	48.	   out = new DataOutputStream(os);
	49.	  }
	50.	
	51.	  public void run()
	52.	  { 
	53.	   while (true)
	54.	   { 
	55.	     try
	56.	     { 
	57.	      double num = rand.nextDouble();
	58.	      out.writeDouble(num);
	59.	      out.flush();
	60.	      sleep(Math.abs(rand.nextInt() % 1000));
	61.	     }
	62.	     catch(Exception e)
	63.	     { 
	64.	      System.out.println("Error: " + e);
	65.	     }
	66.	   }
	67.	  }
	68.	
	69.	  private DataOutputStream out;
	70.	  private Random rand = new Random();
	71.	}
	72.	
	73.	/**
	74.	  A thread that reads numbers from a stream and writes their
	75.	  average to an output stream.
	76.	*/
	77.	class Filter extends Thread
	78.	{ 
	79.	  /**
	80.	   Constructs a filter thread.
	81.	   @param is the output stream
	82.	   @param os the output stream
	83.	  */
	84.	  public Filter(InputStream is, OutputStream os)
	85.	  { 
	86.	   in = new DataInputStream(is);
	87.	   out = new DataOutputStream(os);
	88.	  }
	89.	
	90.	  public void run()
	91.	  { 
	92.	   for (;;)
	93.	   { 
	94.	     try
	95.	     { 
	96.	      double x = in.readDouble();
	97.	      total += x;
	98.	      count++;
	99.	      if (count != 0) out.writeDouble(total / count);
	100.	     }
	101.	     catch(IOException e)
	102.	     { 
	103.	      System.out.println("Error: " + e);
	104.	     }
	105.	   }
	106.	  }
	107.	
	108.	  private DataInputStream in;
	109.	  private DataOutputStream out;
	110.	  private double total = 0;
	111.	  private int count = 0;
	112.	}
	113.	
	114.	/**
	115.	  A thread that reads numbers from a stream and 
	116.	  prints out those that deviate from previous inputs
	117.	  by a threshold value. 
	118.	*/
	119.	class Consumer extends Thread
	120.	{ 
	121.	  /**
	122.	   Constructs a consumer thread.
	123.	   @param is the input stream
	124.	  */  
	125.	  public Consumer(InputStream is)
	126.	  {  
	127.	   in = new DataInputStream(is);
	128.	  }
	129.	
	130.	  public void run()
	131.	  { 
	132.	   for(;;)
	133.	   { 
	134.	     try
	135.	     { 
	136.	      double x = in.readDouble();
	137.	      if (Math.abs(x - oldx) > THRESHOLD)
	138.	      { 
	139.	        System.out.println(x);
	140.	        oldx = x;
	141.	      }
	142.	     }
	143.	     catch(IOException e)
	144.	     { 
	145.	      System.out.println("Error: " + e);
	146.	     }
	147.	   }
	148.	  }
	149.	
	150.	  private double oldx = 0;
	151.	  private DataInputStream in;
	152.	  private static final double THRESHOLD = 0.01;
	153.	}

java.io.PipedInputStream

  • PipedInputStream()
    creates a new piped input stream that is not yet connected to a piped output stream.

  • PipedInputStream(PipedOutputStream out)
    creates a new piped input stream that reads its data from a piped output stream.

Parameters:

out

the source of the data

  • void connect(PipedOutputStream out)
    attaches a piped output stream from which the data will be read.

Parameters:

out

the source of the data

java.io.PipedOutputStream

  • PipedOutputStream()
    creates a new piped output stream that is not yet connected to a piped input stream.

  • PipedOutputStream(PipedInputStream in)
    creates a new piped output stream that writes its data to a piped input stream.

Parameters:

in

the destination of the data

  • void connect(PipedInputStream in) attaches a piped input stream to which the data will be written.

Parameters:

in

the destination of the data

  • + Share This
  • 🔖 Save To Your Account