Reputation: 712
I develop a netty http server, but when I write the response in the method ChannelInboundHandlerAdapter.channelRead0, my response result comes from another server and the size of the result is unknown, so its http response headers maybe has content-length or chunked. so I use a buffer, if it's enough (read up full data) regardless of content-length or chunked, I use content-length, otherwise I use chunked.
How I hold the write channel of first connection then pass it to the seconde Handler inorder to write the response. ( I just directly pass ctx to write but nothing returns)
How I conditionally decide write chunked data to channel or normal data with content-length (it seems not to work to add ChunkWriteHandler if chunk is needed when channelRead0.
take a simple code for example:
```java
EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception
{
System.out.println("Start, I accept client");
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
// SSLEngine engine =
// SecureChatSslContextFactory.getServerContext().createSSLEngine();
// engine.setUseClientMode(false);
// pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
// pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content
// compression.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("deflater", new HttpContentCompressor());
pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{
System.out.println("msg=" + msg);
final ChannelHandlerContext ctxClient2Me = ctx;
// TODO: Implement this method
Bootstrap bs = new Bootstrap();
try{
//bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE));
//.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
bs.resolver(DefaultAddressResolverGroup.INSTANCE);
}catch(Exception e){
e.printStackTrace();
}
bs.channel(NioSocketChannel.class);
EventLoopGroup cg = workerGroup;//new NioEventLoopGroup();
bs.group(cg).handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception
{
System.out.println("start, server accept me");
// TODO: Implement this method
ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
{
// TODO: Implement this method
System.out.println("target = " + msg);
//
if(msg instanceof HttpResponse){
HttpResponse res = (HttpResponse) msg;
HttpUtil.isTransferEncodingChunked(res);
DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());
//resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
//resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");
ctxClient2Me.write(resClient2Me);
}
if(msg instanceof LastHttpContent){
// now response the request of the client, it wastes x seconds from receiving request to response
ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
ctx.close();
}else if( msg instanceof HttpContent){
//ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
}
}
});
System.out.println("end, server accept me");
}
});
final URI uri = new URI("http://example.com/");
String host = uri.getHost();
ChannelFuture connectFuture= bs.connect(host, 80);
System.out.println("to connect me to server");
connectFuture.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture cf) throws Exception
{
}
});
ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io
System.out.println("connected me to server");
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
//req.headers().set(HttpHeaderNames.HOST, "");
connetedFuture.channel().writeAndFlush(req);
System.out.println("end of Client2Me channelRead0");
System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
}
});
System.out.println("end, I accept client");
}
});
System.out.println("========");
ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
```
Upvotes: 2
Views: 3535
Reputation: 452
After a bit of struggle trying to send response from non-Netty eventloop thread, I finally figured out the problem. If your client is closing the outputstream using
socketChannel.shutdownOutput()
then you need to set ALLOW_HALF_CLOSURE
property true in Netty so it won't close the channel.
Here's a sample server. The client is left as an exercise to the reader :-)
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE, true) // This option doesn't work
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
@Override
protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true); // This is important
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
String id = ctx.channel().id().asLongText();
// When Done reading all the bytes, send response 1 second later
timer.schedule(new TimerTask() {
@Override
public void run() {
ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
ctx.flush();
ctx.close();
log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
startTimes.remove(id);
}
}, 1000);
}
}
}
});
Channel ch = b.bind("localhost", PORT).sync().channel();
ch.closeFuture().sync();
Ofcourse, as mentioned by others in the thread, you cannot send Strings, you need to send a ByteBuf using Unpooled.copiedBuffer
Upvotes: 1
Reputation: 712
ChannelInboundHandlerAdapter.channelRead(ChannelHandlerContext ctx, Object msg)
(msg is not released after returning automatically) or SimpleChannelInboundHandler.channelRead0(ChannelHandlerContext ctx, I msg)
(it releases the received messages automatically after returning) for later use. Maybe you can refer to the example at the end, pass the channel to another ChannelHandler
.All I/O operations are asynchronous.
All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a
ChannelFuture
instance which will notify you when the requested I/O operation has succeeded, failed, or canceled.
public interface Channel extends AttributeMap, Comparable<Channel> {
/**
* Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg);
/**
* Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg, ChannelPromise promise);
/**
* Request to flush all pending messages.
*/
Channel flush();
/**
* Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
*/
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
/**
* Shortcut for call {@link #write(Object)} and {@link #flush()}.
*/
ChannelFuture writeAndFlush(Object msg);
}
HttpResponseEncoder
(it is a subclass of HttpObjectEncoder, which has a private filed private int state = ST_INIT;
to remember whether to encode HTTP body data as chunked) into ChannelPipeline
, the only thing to do is add a header 'transfer-encoding: chunked', e.g. HttpUtil.setTransferEncodingChunked(srcRes, true);
.```java
public class NettyToServerChat extends SimpleChannelInboundHandler<HttpObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyToServerChat.class);
public static final String CHANNEL_NAME = "NettyToServer";
protected final ChannelHandlerContext ctxClientToNetty;
/** Determines if the response supports keepalive */
private boolean responseKeepalive = true;
/** Determines if the response is chunked */
private boolean responseChunked = false;
public NettyToServerChat(ChannelHandlerContext ctxClientToNetty) {
this.ctxClientToNetty = ctxClientToNetty;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
HttpResponseStatus resStatus = response.status();
//LOGGER.info("Status Line: {} {} {}", response.getProtocolVersion(), resStatus.code(), resStatus.reasonPhrase());
if (!response.headers().isEmpty()) {
for (CharSequence name : response.headers().names()) {
for (CharSequence value : response.headers().getAll(name)) {
//LOGGER.info("HEADER: {} = {}", name, value);
}
}
//LOGGER.info("");
}
//response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
HttpResponse srcRes = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
if (HttpUtil.isTransferEncodingChunked(response)) {
responseChunked = true;
HttpUtil.setTransferEncodingChunked(srcRes, true);
ctxNettyToServer.channel().write(srcRes);
//ctx.channel().pipeline().addAfter(CHANNEL_NAME, "ChunkedWrite", new ChunkedWriteHandler());
} else {
ctxNettyToServer.channel().write(srcRes);
//ctx.channel().pipeline().remove("ChunkedWrite");
}
}
if (msg instanceof LastHttpContent) { // prioritize the subclass interface
ctx.close();
LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
Thread.sleep(3000);
LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
if(!responseChunked){
HttpContent content = (HttpContent) msg;
// https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/SimpleChannelInboundHandler.java
// @see {@link SimpleChannelInboundHandler<I>#channelRead(ChannelHandlerContext, I)}
ctxNettyToServer.writeAndFlush(content.retain()).addListener(ChannelFutureListener.CLOSE);
}else{
ctxNettyToServer.close();
}
LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
} else if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
// We need to do a ReferenceCountUtil.retain() on the buffer to increase the reference count by 1
ctxNettyToServer.write(content.retain());
}
}
}
```
Upvotes: 0