Reputation: 569
I have a problem related to PipedInputStream
and PipedOutputStream
, and I don't know if I have misunderstood the design of these classes or if there is a bug in the java code in PipedInputStream.java
As far as I understand PipedInputStream
and PipedOutputStream
implement a mechanism that can be used to create a stream between two different threads. A producer thread writes something in the PipedOutputStream
and a consumer thread reads it in the connected PipedInputStream
. There is an internal buffer to allow a buffered communication. By default the size of this buffer is 1024 bytes.
If the consumer thread reads the PipedInputStream
and the buffer is empty, then the thread waits. If the producer thread writes the PipedOutputStream
and the buffer is full, then the thread also waits.
PipedInputStream
maintains the internal buffer, PipedOutputStream
only uses functions declared in PipedInputStream
.
All the fields related with the internal (circular) buffer in PipedInputStream
(the byte [] buffer
, the int in
and the int out
-as you can see in PipedInputStream.java
) are all declared protected
. PipedInputStream
injects data using 2 different PipedInputStream.receive
functions.
All InputStreams has two read versions: read()
and read(byte [], int, int)
. All OutputStreams has two write versions write(byte b)
and write(byte [], int, int)
. All have a single-byte version and a multi-byte version. PipedInputStream
and PipedOutputStream
have these functions.
PipedOutputStream.write(byte b)
uses PipedInputStream.receive(int b)
function to inject the byte in the connected PipedInputStream
. This receive function is declared protected
so you can overload this function and intercept any byte injection from PipedOutputStream
to the connected PipedInputStream
.
PipedOutputStream.write(byte b[], int offset, int len)
uses PipedInputStream.receive(byte [] b, int offset, int len)
to inject an array of bytes in the connected PipedInputStream
.
And here comes my problem: PipedInputStream.receive(byte [], int, int)
, the multi-byte counterpart of receive(int)
, is not declared protected as receive(int)
is, it has the default visibility (package visibility). So you cannot overload this function and intercept multibytes injection from PipedOutputStream
to the connected PipedInputStream
.
PipedInputStream.write(byte b[], int offset, int len)
does not invoke PipedInputStream.write(int b)
. So overloading receive(int)
has no effect when using receive(byte [],int, int)
.
As far as I understand, PipedInputStream.receive(byte[], int, int)
should be protected
as PipedInputStream.receive(int)
is. Its declaration:
synchronized void receive(byte [] b, int off, int len) throws IOException {
should be:
protected synchronized void receive(byte [] b, int off, int len) throws IOException {
PipeReader
and PipeWriter
(the character version of PipedInputStream
and PipedOutputStream
) declare the buffer fields and the receive method with package visibility (not protected!). Reader/Writer (since JDK1.1) in Java are newer than InputStream/OutputStream (since JDK1.0).
Is it a real bug in the design of PipedInputStream
?, is the protected
visibity in PipedInputStream
a design accident inherited from earlier Java versions?, or or am I completely lost?.
Thanks in advance.
PD: Here is an example where this problem appears. This program does not compile (by the mentioned receive visibility problem). In this example I try to create a PipedInputStream
subclass that allows the automatic extension of the buffer when needed. So, if buffer is empty and someone try to read the thread waits. But if buffer is full and someone try to write (using the connected PipedOutputStream
) the thread does not wait, but the buffer is extended to store more bytes. Consumers wait but producers don't.
I have my own functional implementations of this example, but I wanted to know if it cannot be implemented as a PipedInputStream
subclass.
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class ExtensiblePipedInputStream extends PipedInputStream {
/**
* Default extensions' size
*/
private static final int DEFAULT_EXTENSION = 1024;
/**
* The current extensions' size
*/
protected int extension = DEFAULT_EXTENSION;
// the same constructors than the super class (PipedInputStream)...
public ExtensiblePipedInputStream() {
super();
}
public ExtensiblePipedInputStream(PipedOutputStream src) throws IOException {
super(src);
}
public ExtensiblePipedInputStream(int pipeSize) {
super(pipeSize);
}
public ExtensiblePipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
super(src, pipeSize);
}
/**
* This function ensures the specified capacity in the internal buffer. If
* the specified capacity is less or equals than the current internal buffer
* capacity it does nothing. If the specified capacity is greater than the
* current one, then the buffer is extended to: at least allocate the new
* capacity. This function extends the buffer using multiple factors of
* extension size.
*
* @param capacity The capacity
* @throws IOException if an IO error occurs
* @throws IllegalArgumentException if capacity is negative
*/
public synchronized void ensureCapacity(int capacity) throws IOException, IllegalArgumentException {
if (capacity < 0) {
throw new IllegalArgumentException("capacity < 0");
}
if (capacity > buffer.length) {
int additionalSpace = capacity - buffer.length;
final int modExtension = additionalSpace % extension;
additionalSpace += (modExtension == 0) ? 0 : extension - modExtension;
setCapacity(buffer.length + additionalSpace);
}
}
/**
* Returns capacity of the internal buffer (the buffer's size).
*
* @return The capacity or the internal buffer
*/
public synchronized int getCapacity() {
return buffer.length;
}
/**
* Returns the size of the next buffer's extensions.
*
* @return The size of the next buffer's extensions.
*/
public synchronized int getExtension() {
return extension;
}
/**
* This function extends and invokes PipedInputStream.receive. It only avoid
* writers block by extending the internal buffer when needed.
*
* @param b The byte to be received
* @throws IOException if an IO error occurs
*/
@Override
protected synchronized void receive(int b) throws IOException {
ensureCapacity(available() + 1);
super.receive(b);
}
/**
* MY PROBLEM!!!!
*
* this function is not posible!
*
* PipedInputStream.receive(byte[], int, int)
* has not protected visibility, it has package visibility!!!!!
*
* Why?
*
* @param b The array of bytes to be received
* @param off The offset in the array of bytes.
* @param len The number of bytes to be received.
* @throws IOException If an IO error occurs
*/
@Override
protected synchronized void receive(byte b[], int off, int len) throws IOException {
ensureCapacity(available() + len);
super.receive(b, off, len);
}
/**
* Changes the size of the internal buffer. The new size must be greater or
* equals than the number of bytes stored in the internal buffer
* (available())
*
* @param capacity The new size of the internal buffer.
* @throws IOException If an IO error occurs.
* @throws IllegalArgumentException If capacity < available()
*/
public synchronized void setCapacity(int capacity) throws IOException, IllegalArgumentException {
final int available = available();
if (capacity < available) {
throw new IllegalArgumentException("capacity < available");
}
final byte[] nbuf = new byte[capacity];
if (available > 0) {
final int firstTransferAmount = Math.min(available, buffer.length - out);
System.arraycopy(buffer, out, nbuf, 0, firstTransferAmount);
if (in > 0) {
System.arraycopy(buffer, 0, nbuf, firstTransferAmount, in);
}
out = 0;
in = (available == capacity) ? 0 : available;
}
buffer = nbuf;
}
/**
* Set the size of future extensions. It must be a value greater than 0.
*
* @param extension The size of future extensions.
* @throws IllegalArgumentException If extension <= 0
*/
public synchronized void setExtension(int extension) throws IllegalArgumentException {
if (extension <= 0) {
throw new IllegalArgumentException("extension <= 0");
}
this.extension = extension;
}
}
Upvotes: 1
Views: 1818
Reputation: 718906
An API design that does not support a particular use-case (especially one that involves subclassing) could be described as "limited" or "restricted" ... but it is a stretch to call this a bug.
If we accept the dubious proposition1 that what you are trying to implement is worthwhile, then I think your analysis is correct. Creating a version of the piped input / output streams with expandable buffers is more work than it would be if the modifiers were different.
But ... it is not impossible. You can achieve what you want by a more large-scale overriding of methods (e.g. the read
and write
methods), or by simply starting from scratch.
The other thing to note is that this (StackOverflow) is not the right place to report Java "bugs" or make requests for enhancement. If you really feel strongly about this, try developing and submitting a patch with your proposed change to the OpenJDK team. But bear in mind that anything that breaks compatibility (including compatibility with existing customer subclasses of PipedInputStream
and PipedOutputStream
) is likely to be rejected. This could this might explain why they haven't "fixed" this already!
1 - As Barry points out, the piped streams are blocking by design. A non-blocking version is potentially dangerous.
Upvotes: 0
Reputation: 3809
I think you've misunderstood the purpose of pipes. Pipes are for blocking communication between two different threads. The speed of the writer is expected to be limited to the speed of the reader which means a pipe is efficient in memory usage, but limits processing to the speed of the slowest component.
If you want asynchronous writing you should look at using a Queue - one of the versions in the java.util.concurrent package should suit.
Upvotes: 2