Reputation: 187
Here's my use case... I have an upstream service that sends my Netty app data over the network, and that data needs to be published to multiple clients connected to Netty. The data pushed to the clients must be HTTP "Transfer-Encoding: chunked."
I found ChunkedStream
and though that maybe I could create a PipedInputStream
and a PipedOutputStream
(connected to the PipedInputStream
) and write the ChunkedStream
to the channel. Then when data is received from my upstream service I could write the data into the PipedOutputStream
of the channels and it'd be sent to the clients:
In channelConnected
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
ctx.getChannel().write( new PersistentChunkedStream(in) );
Separate thread publishes data to a connected channels
ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8);
out.write( buff.array() );
channel.get(ChunkedWriteHandler.class).resumeTransfer();
I had to extend ChunkedStream
to return null
from nextChunk
if there are 0 bytes available (to "suspend" the write without the thread hanging), so I call resumeTransfer
after I write to the PipedOutputStream
of the associated channel. When I debug and step through the code, I can see flush
of ChunkedWriteHandler
being called, which does call:
Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress());
with the bytes I wrote into the PipedOutputStream,
but it's never received by the client.
HTTP curl
~ $ curl -vN http://localhost:8080/stream
* About to connect() to localhost port 8080 (#0)
* Trying 127.0.0.1... connected
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /stream HTTP/1.1
> User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3
> Host: localhost:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
<
### NOTE: NO "FOO" TRANSMIT BACK ###
Any thoughts? Maybe there's a better way to accomplish this?
Upvotes: 3
Views: 7087
Reputation: 168
I know this is an old question, but hopefully this helps someone.
The ChunkedStream does NOT imply HTTP Chunking...it's an unfortunate naming collision best I can tell. Chunked streams are there just to avoid loading an entire item into memory, effectively the ChunkedWriter calls back into the ChunkedStream after each chunk to ask for more data.
As it turns out, you CAN use the ChunkedStream paradigm to create something that does HTTP chunking for you from a standard input stream. The code below implements ChunkedInput and takes an InputStream. It also automatically appends the trailing http chunk to indicate EOF, but does so only once as per the ChunkedInput spec.
public class HttpChunkStream implements ChunkedInput {
private static final int CHUNK_SIZE = 8192;
boolean eof = false;
InputStream data;
HttpChunkStream (InputStream data) {
this.data= data;
}
byte[] buf = new byte[CHUNK_SIZE];
@Override
public Object nextChunk() throws Exception {
if (eof)
return null;
int b = data.read(buf);
if (b==-1) {
eof=true;
return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
}
DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b));
return c;
}
@Override
public boolean isEndOfInput() throws Exception {
return eof;
}
@Override
public boolean hasNextChunk() throws Exception {
return isEndOfInput()==false;
}
@Override
public void close() throws Exception {
Closeables.closeQuietly(data);
}
}
Upvotes: 3
Reputation: 187
Just to add some more content to the answer provided by Norman.
When sending arbitrary chunked data, you must first send a new DefaultHttpResponse (one time only):
HttpResponse res = new DefaultHttpResponse();
res.setChunked(true);
res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
channel.write(res);
Then anytime you want to write to the channel with an arbitrary chunk, call:
HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8)));
channel.write(chunk);
Upvotes: 3
Reputation: 23557
I wonder why you even want to use the PipedInputStream / PipedOutputStream. I think it would be away cleaner / easier to just call Channel.write(..) directly without your data. Just be aware to submit as much data as you can in Channel.write(..), as its an expensive operation.
You can call Channel.write(..) from any thread that you want, as its thread-safe.
Upvotes: 4