Ryan Shelley
Ryan Shelley

Reputation: 187

How to use ChunkedStream properly

Here's my use case... I have an upstream service that sends my Netty app data over the network, and that data needs to be published to multiple clients connected to Netty. The data pushed to the clients must be HTTP "Transfer-Encoding: chunked."

I found ChunkedStream and though that maybe I could create a PipedInputStream and a PipedOutputStream (connected to the PipedInputStream) and write the ChunkedStream to the channel. Then when data is received from my upstream service I could write the data into the PipedOutputStream of the channels and it'd be sent to the clients:

In channelConnected

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
ctx.getChannel().write( new PersistentChunkedStream(in) );

Separate thread publishes data to a connected channels

ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8);
out.write( buff.array() );
channel.get(ChunkedWriteHandler.class).resumeTransfer();

I had to extend ChunkedStream to return null from nextChunk if there are 0 bytes available (to "suspend" the write without the thread hanging), so I call resumeTransfer after I write to the PipedOutputStream of the associated channel. When I debug and step through the code, I can see flush of ChunkedWriteHandler being called, which does call:

Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress());

with the bytes I wrote into the PipedOutputStream, but it's never received by the client.

HTTP curl

~ $ curl -vN http://localhost:8080/stream
* About to connect() to localhost port 8080 (#0)
*   Trying 127.0.0.1... connected
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /stream HTTP/1.1
> User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3
> Host: localhost:8080
> Accept: */*
> 
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
< 
### NOTE: NO "FOO" TRANSMIT BACK ###

Any thoughts? Maybe there's a better way to accomplish this?

Upvotes: 3

Views: 7087

Answers (3)

aaron
aaron

Reputation: 168

I know this is an old question, but hopefully this helps someone.

The ChunkedStream does NOT imply HTTP Chunking...it's an unfortunate naming collision best I can tell. Chunked streams are there just to avoid loading an entire item into memory, effectively the ChunkedWriter calls back into the ChunkedStream after each chunk to ask for more data.

As it turns out, you CAN use the ChunkedStream paradigm to create something that does HTTP chunking for you from a standard input stream. The code below implements ChunkedInput and takes an InputStream. It also automatically appends the trailing http chunk to indicate EOF, but does so only once as per the ChunkedInput spec.

public class HttpChunkStream implements ChunkedInput {

private static final int CHUNK_SIZE = 8192;
boolean eof = false;

InputStream data;

HttpChunkStream (InputStream data) {
    this.data= data;
}

byte[] buf = new byte[CHUNK_SIZE];
@Override
public Object nextChunk() throws Exception {
    if (eof)
        return null;        
    int b = data.read(buf);
    if (b==-1) {
        eof=true;
        return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);           
    }                   
    DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b));       
    return c;
}

@Override
public boolean isEndOfInput() throws Exception {
    return eof;
}

@Override
public boolean hasNextChunk() throws Exception {
    return isEndOfInput()==false;
}

@Override
public void close() throws Exception {
    Closeables.closeQuietly(data);              
}

}

Upvotes: 3

Ryan Shelley
Ryan Shelley

Reputation: 187

Just to add some more content to the answer provided by Norman.

When sending arbitrary chunked data, you must first send a new DefaultHttpResponse (one time only):

HttpResponse res = new DefaultHttpResponse();
res.setChunked(true);
res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
channel.write(res);

Then anytime you want to write to the channel with an arbitrary chunk, call:

HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8)));
channel.write(chunk);

Upvotes: 3

Norman Maurer
Norman Maurer

Reputation: 23557

I wonder why you even want to use the PipedInputStream / PipedOutputStream. I think it would be away cleaner / easier to just call Channel.write(..) directly without your data. Just be aware to submit as much data as you can in Channel.write(..), as its an expensive operation.

You can call Channel.write(..) from any thread that you want, as its thread-safe.

Upvotes: 4

Related Questions