JDK-4028322 : java.io.PipedInputStream doesn't reliably support multiple writers
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.io
  • Affected Version: 1.1,1.1.5,1.2.0
  • Priority: P4
  • Status: Closed
  • Resolution: Won't Fix
  • OS: generic,windows_nt
  • CPU: generic,x86
  • Submitted: 1997-01-28
  • Updated: 2019-04-27
  • Resolved: 1999-06-29
Related Reports
Duplicate :  
Relates :  
Relates :  
Relates :  
Relates :  
Relates :  
Relates :  
Description

Name: mc57594			Date: 01/27/97


below is a program to reproduce the problem-
// This program demonstrates how the PipedInputStream object is not
// thread safe.   
//
// What *should* happen is that the consumer thread should run 
// and print notification when a read() is completed from each of
// the 4 producer threads.  What *does* happen is the consumer reads
// the data from the first producer, then is thrown a Pipe broken
IOException.
// Ignoring the IOException is not a valid option, as the pipe will
// continue to receive exceptions at a rate of about 1/second, instead
// of just blocking.
//
// PipedInputStream 'remembers' the thread ID of the
// writer when the receive() method of the object is called to write
// data into the pipe.  When the read() method is called, and there
// is no data available to be read, PipedInputStream should block.
// Before it will block, it checks to see if the writer thread
// is still alive (presumably to see if there is hope of ever unblocking
// again).  Unfortunately, if there is more than one thread that will
// be writing to the pipe, and the last one that visited the receive()
// method has terminated, the pipe will not block, and it will throw
// a broken pipe IOException instead.
//
// There should be 1) a way to specify several threads that are
'eligible'
// to write to the pipe, 2) a way to selectively disable the isAlive()
check 
// in the read() method altogether, and/or 3) access to the writeSide
member
// to allow the user to focus the isAlive() check on a different thread.
// None of the 'useful' bits of PipedInputStream are public, so the only
// way to subclass it and alter this behavior is to write something that 
// resides in the package java.io, which is not a 'clean' solution.
//
// Pat Mancuso
// Cabletron Systems, Inc.
// ###@###.### 

import java.io.*;

/* 
 * The producer class sleeps a specified time, then writes into the
pipe.
 */
class Producer extends Thread
{
  PipedOutputStream pos = null;
  int wait = 0;

  public Producer(PipedOutputStream p, int w) 
  {
    pos = p;
    wait = w;
  }

  public void run()
  {
    byte b[] = new byte[1];
    b[0]='a';

    try {
      System.out.println("Producer sleeping "+wait+" seconds...");
      sleep(wait*1000);
      System.out.println("...slept "+wait+" seconds, writing");
    } catch (Exception e) {
      System.out.println("Producer sleep interrupted!");
      return;
    }

    try {
      pos.write(b, 0, 1);
    } catch (IOException e) {
      System.out.println("producer caught IOException: "+e.toString());
    }
  }
}

/* 
 * The consumer class should read from the pipe forever
 */
class Consumer extends Thread
{
  PipedInputStream pis;

  public Consumer(PipedInputStream p)
  {
    pis = p;
  }

  public void run()
  {
    int i;

    while (true) {
      try {
        i = pis.read();
        System.out.println("consumer read()");
      } catch (IOException e) {
        System.out.println("consumer caught IOException:
"+e.toString());
        // comment out the 'return' below to retry after each exception.
        // this 'works', but wasteful, as the consumer is constantly
        // receiving exceptions at a rate of about 1/second.
        return;
      }
    }
  }
}

public class PipeTest
{
  static public PipedInputStream pis = null;
  static public PipedOutputStream pos = null;

  public static void main(String argv[]) {

    try {
      pis = new PipedInputStream();
      pos = new PipedOutputStream(pis);
    } catch (IOException e) {
      System.out.println("main() caught IOException: "+e.toString());
    }


    // set up consumer - it will block on the read() since
    // writeSide is null initially.
    Consumer consumer = new Consumer(pis);
    
    // set up several producers to wake up at various times
    Producer producer1 = new Producer(pos, 0);
    Producer producer2 = new Producer(pos, 5);
    Producer producer3 = new Producer(pos, 10);
    Producer producer4 = new Producer(pos, 15);

    // start everything off
    consumer.start();
    producer1.start();
    producer2.start();
    producer3.start();
    producer4.start();
    
  }
  
}
======================================================================

Comments
EVALUATION This is a known and longstanding problem. The spec for PipedInputStream/PipedOutputStream is not clear on what the behaviour should be in the face of multiple writers/readers. In particular, for the read method of PipedInputStream, the spec says: If a thread was providing data bytes to the connected piped output stream, but the thread is no longer alive, then an IOException is thrown. It is not clear when there are multiple writer threads and one of them die, what will happen. The current implementation simply remembers the most recent writer thread that the read thread has received data from, and if that writer thread dies before any other writer thread has got a chance to reset the writer, an IOException is thrown. We have considered the option of keep tracking of all the writer/reader threads that have ever connected to the pipe, and if any one of them dies, an IOException will be thrown when the next IO operation is attempted. The problem with this approach is that even when there are other alive writers, readers cannot read if just one writer dies. The other approach is to throw the exception only when all the writer threads have died. This would allow reader threads to continue to read when there is data in the buffer and at least on writer thread is alive. The problem with this approach is that it does not handle the case when all the existing writers have died, but a new writer thread later tries to write to the buffer. Either of the above cases would require bookkeeping all the existing threads that have tried to write to the pipe, even for already dead threads. Weak reference can be used to implement this. Another problem of the above two approaches is that it might break existing applications that depends on the current implementation. To solve this problem, we could special case the single writer and single reader case and make them work the same way as before, but it will cause more complexity which may not justify the cause. From the above analysis, we decided it is better off not to fix the problem in the existing implementation. We will probably tighten the documentation and specify that these two classes can only handle single reader/writer cases. As for multiple reader/writer cases, user can do their own synchronization on top these two classes.
11-06-2004

WORK AROUND Name: mc57594 Date: 01/27/97 sublcass PipedInputStream and fool it into thinking the lastWrite thread still exists by setting the lastWrite to Thread.getCurrentThreadId() if you get a Broken pipe IOexception. This must be done inside the java.io package, however. ======================================================================
11-06-2004

PUBLIC COMMENTS The PipedInputStream class does not always work properly when more than one thread writes to the corresponding PipedOutputStream.
10-06-2004

SUGGESTED FIX That PipedInputStream tries to keep track of its reader and writer threads seems to me to be misguided. This works fine if the pipe is only meant to connect two threads, as in Unix, but in the more general case it breaks down, as demonstrated by the program in the description. I think a valid fix would be to remove the code that makes sure the readSide and writeSide threads are alive. Since those tests are the only use of the readSide and writeSide fields, the fields can also be removed. -- mr@eng 1/28/97
28-01-0097