I develop a netty http server, but when I write the response in the method ChannelInboundHandlerAdapter.channelRead0, my response result comes from another server and the size of the result is unknown, so its http response headers maybe has content-length or chunked. so I use a buffer, if it's enough (read up full data) regardless of content-length or chunked, I use content-length, otherwise I use chunked.
How I hold the write channel of first connection then pass it to the seconde Handler inorder to write the response. ( I just directly pass ctx to write but nothing returns)
How I conditionally decide write chunked data to channel or normal data with content-length (it seems not to work to add ChunkWriteHandler if chunk is needed when channelRead0.
take a simple code for example:
EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>(){
protected void initChannel(Channel ch) throws Exception
System.out.println("Start, I accept client");
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
// SSLEngine engine =
// SecureChatSslContextFactory.getServerContext().createSSLEngine();
// engine.setUseClientMode(false);
// pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
// pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content
// compression.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("deflater", new HttpContentCompressor());
pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
System.out.println("msg=" + msg);
final ChannelHandlerContext ctxClient2Me = ctx;
// TODO: Implement this method
Bootstrap bs = new Bootstrap();
//bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE));
//.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
}catch(Exception e){
EventLoopGroup cg = workerGroup;//new NioEventLoopGroup(); ChannelInitializer<Channel>(){
protected void initChannel(Channel ch) throws Exception
System.out.println("start, server accept me");
// TODO: Implement this method
ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
// TODO: Implement this method
System.out.println("target = " + msg);
if(msg instanceof HttpResponse){
HttpResponse res = (HttpResponse) msg;
DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());
//resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
//resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");
if(msg instanceof LastHttpContent){
// now response the request of the client, it wastes x seconds from receiving request to response
}else if( msg instanceof HttpContent){
//ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
System.out.println("end, server accept me");
final URI uri = new URI("");
String host = uri.getHost();
ChannelFuture connectFuture= bs.connect(host, 80);
System.out.println("to connect me to server");
connectFuture.addListener(new ChannelFutureListener(){
public void operationComplete(ChannelFuture cf) throws Exception
ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io
System.out.println("connected me to server");
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
//req.headers().set(HttpHeaderNames.HOST, "");;
System.out.println("end of Client2Me channelRead0");
System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
System.out.println("end, I accept client");
ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();;
} finally {
After a bit of struggle trying to send response from non-Netty eventloop thread, I finally figured out the problem. If your client is closing the outputstream using
then you need to set ALLOW_HALF_CLOSURE
property true in Netty so it won't close the channel.
Here's a sample server. The client is left as an exercise to the reader :-)
final ServerBootstrap b = new ServerBootstrap();, workerGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE, true) // This option doesn't work
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<>() {
protected void initChannel( ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {, true); // This is important
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
String id =;
// When Done reading all the bytes, send response 1 second later
timer.schedule(new TimerTask() {
public void run() {
ctx.close();"[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
}, 1000);
Channel ch = b.bind("localhost", PORT).sync().channel();
Ofcourse, as mentioned by others in the thread, you cannot send Strings, you need to send a ByteBuf using Unpooled.copiedBuffer
Upvotes: 1
ChannelInboundHandlerAdapter.channelRead(ChannelHandlerContext ctx, Object msg)
(msg is not released after returning automatically) or SimpleChannelInboundHandler.channelRead0(ChannelHandlerContext ctx, I msg)
(it releases the received messages automatically after returning) for later use. Maybe you can refer to the example at the end, pass the channel to another ChannelHandler
.All I/O operations are asynchronous.
All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a
instance which will notify you when the requested I/O operation has succeeded, failed, or canceled.
public interface Channel extends AttributeMap, Comparable<Channel> {
* Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
ChannelFuture write(Object msg);
* Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
ChannelFuture write(Object msg, ChannelPromise promise);
* Request to flush all pending messages.
Channel flush();
* Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
* Shortcut for call {@link #write(Object)} and {@link #flush()}.
ChannelFuture writeAndFlush(Object msg);
(it is a subclass of HttpObjectEncoder, which has a private filed private int state = ST_INIT;
to remember whether to encode HTTP body data as chunked) into ChannelPipeline
, the only thing to do is add a header 'transfer-encoding: chunked', e.g. HttpUtil.setTransferEncodingChunked(srcRes, true);
public class NettyToServerChat extends SimpleChannelInboundHandler<HttpObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyToServerChat.class);
public static final String CHANNEL_NAME = "NettyToServer";
protected final ChannelHandlerContext ctxClientToNetty;
/** Determines if the response supports keepalive */
private boolean responseKeepalive = true;
/** Determines if the response is chunked */
private boolean responseChunked = false;
public NettyToServerChat(ChannelHandlerContext ctxClientToNetty) {
this.ctxClientToNetty = ctxClientToNetty;
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
HttpResponseStatus resStatus = response.status();
//"Status Line: {} {} {}", response.getProtocolVersion(), resStatus.code(), resStatus.reasonPhrase());
if (!response.headers().isEmpty()) {
for (CharSequence name : response.headers().names()) {
for (CharSequence value : response.headers().getAll(name)) {
//"HEADER: {} = {}", name, value);
//response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
HttpResponse srcRes = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
if (HttpUtil.isTransferEncodingChunked(response)) {
responseChunked = true;
HttpUtil.setTransferEncodingChunked(srcRes, true);;
//, "ChunkedWrite", new ChunkedWriteHandler());
} else {;
if (msg instanceof LastHttpContent) { // prioritize the subclass interface
LOGGER.debug(" = {}",;
LOGGER.debug(" = {}",;
HttpContent content = (HttpContent) msg;
// @see {@link SimpleChannelInboundHandler<I>#channelRead(ChannelHandlerContext, I)}
LOGGER.debug(" = {}",;
} else if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
// We need to do a ReferenceCountUtil.retain() on the buffer to increase the reference count by 1
