Reputation: 6414
Problem description:
My program creates a Netty server and client, then it makes 2^17 connections to that server, at some point the client starts to receive this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
.
The english equivalent is:
java.io.IOException: An existing connection was forcibly closed by the remote host
Obviously it is not desired that server is forcibly closing existing connections.
Steps to reproduce:
For convenience of anyone willing to reproduce this problem I've created this "single runnable java file" program that reproduces it, it needs only the netty-all-4.1.12.Final.jar
dependency. It starts netty server on some free port, then creates client, perform requests, waits a bit to give the server chance to process the requests, then print statistics about how many connections was made, how many connections did server process, how many connections was lost, how many and what kind of exceptions did server encountered, and how many and what kind of exceptions did client encountered.
package netty.exception.tst;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyException {
public static void main(String[] args) throws InterruptedException {
System.out.println("starting server");
NettyServer server = new NettyServer(0);
int port = server.getPort();
System.out.println("server started at port: " + port);
System.out.println("staring client");
NettyClient client = new NettyClient();
System.out.println("client started");
int noOfConnectionsToPerform = 1 << 17;
System.out.println("performing " + noOfConnectionsToPerform + " connections");
for (int n = 0; n < noOfConnectionsToPerform; n++) {
// send a request
ChannelFuture f = client.getBootstrap().connect("localhost", port);
}
System.out.println("client performed " + noOfConnectionsToPerform + " connections");
System.out.println("wait a bit to give a chance for server to finish processing incoming requests");
Thread.currentThread().sleep(80000);
System.out.println("shutting down server and client");
server.stop();
client.stop();
System.out.println("stopped, server received: " + server.connectionsCount() + " connections");
int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount();
if (numberOfLostConnections > 0) {
System.out.println("Where do we lost " + numberOfLostConnections + " connections?");
}
System.out.println("srerver exceptions: ");
printExceptions(server.getExceptions());
System.out.println("client exceptions: ");
printExceptions(client.getExceptions());
}
private static void printExceptions(Map<String, Integer> exceptions) {
if (exceptions.isEmpty()) {
System.out.println("There was no exceptions");
}
for (Entry<String, Integer> exception : exceptions.entrySet()) {
System.out.println("There was " + exception.getValue() + " times this exception:");
System.out.println(exception.getKey());
}
}
public static class NettyServer {
private ChannelFuture channelFuture;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private AtomicInteger connections = new AtomicInteger(0);
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyServer(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
connections.incrementAndGet();
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = serverBootstrap.bind(port).sync();
}
public int getPort() {
return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
}
public int connectionsCount() {
return connections.get();
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
try {
bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class NettyClient {
private Bootstrap bootstrap;
private EventLoopGroup workerGroup;
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyClient() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
});
}
public Bootstrap getBootstrap() {
return bootstrap;
}
public void stop() {
workerGroup.shutdownGracefully();
try {
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
}
public static class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>();
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf.set(ctx.alloc().buffer(4));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.get().release();
buf.remove();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.get().writeBytes(m);
m.release();
if (buf.get().readableBytes() >= 4) {
long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L;
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class ExceptionCounter {
private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>();
private void countException(Throwable cause) {
StringWriter writer = new StringWriter();
cause.printStackTrace(new PrintWriter(writer));
String stackTrace = writer.toString();
AtomicInteger exceptionCount = exceptions.get(stackTrace);
if (exceptionCount == null) {
exceptionCount = new AtomicInteger(0);
AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount);
if (prevCount != null) {
exceptionCount = prevCount;
}
}
exceptionCount.incrementAndGet();
}
public Map<String, Integer> getExceptions() {
Map<String, Integer> newMap = exceptions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
return Collections.unmodifiableMap(newMap);
}
}
}
The output is:
starting server
server started at port: 56069
staring client
client started
performing 131072 connections
client performed 131072 connections
wait a bit to give a chance for server to finish processing incoming requests
shutting down server and client
stopped, server received: 34735 connections
Where do we lost 96337 connections?
srerver exceptions:
There was no exceptions
client exceptions:
There was 258 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
There was 30312 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
Questions:
TimeClientHandler
the field declaration of private ThreadLocal<ByteBuf> buf
to be also static, I have null pointer exception in TimeClientHandler.handlerRemoved
? This is very strange, is this class somehow replicated? or are the Threads from NioEventLoopGroup
somehow strange?Environment:
Upvotes: 0
Views: 1713
Reputation: 21
There is a limit of 64k ports per IP address, so you cannot open 2^17 ports. Since each socket uses a file handle, you might be hitting the limit of max open files per process. See "Max open files" for working process.
Upvotes: 1