Krzysztof Cichocki
Krzysztof Cichocki

Reputation: 6414

Netty (netty-all-4.1.12.Final.jar) java.io.IOException: An existing connection was forcibly closed by the remote host

Problem description:

My program creates a Netty server and client, then it makes 2^17 connections to that server, at some point the client starts to receive this exception:

java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta.

The english equivalent is:

java.io.IOException: An existing connection was forcibly closed by the remote host

Obviously it is not desired that server is forcibly closing existing connections.

Steps to reproduce:

For convenience of anyone willing to reproduce this problem I've created this "single runnable java file" program that reproduces it, it needs only the netty-all-4.1.12.Final.jar dependency. It starts netty server on some free port, then creates client, perform requests, waits a bit to give the server chance to process the requests, then print statistics about how many connections was made, how many connections did server process, how many connections was lost, how many and what kind of exceptions did server encountered, and how many and what kind of exceptions did client encountered.

package netty.exception.tst;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyException {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("starting server");
        NettyServer server = new NettyServer(0);
        int port = server.getPort();
        System.out.println("server started at port: " + port);

        System.out.println("staring client");
        NettyClient client = new NettyClient();
        System.out.println("client started");

        int noOfConnectionsToPerform = 1 << 17;
        System.out.println("performing " + noOfConnectionsToPerform + " connections");
        for (int n = 0; n < noOfConnectionsToPerform; n++) {
            // send a request
            ChannelFuture f = client.getBootstrap().connect("localhost", port);
        }
        System.out.println("client performed " + noOfConnectionsToPerform + " connections");

        System.out.println("wait a bit to give a chance for server to finish processing incoming requests");
        Thread.currentThread().sleep(80000);

        System.out.println("shutting down server and client");
        server.stop();
        client.stop();

        System.out.println("stopped, server received: " + server.connectionsCount() + " connections");
        int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount();
        if (numberOfLostConnections > 0) {
            System.out.println("Where do we lost " + numberOfLostConnections + " connections?");
        }

        System.out.println("srerver exceptions: ");
        printExceptions(server.getExceptions());
        System.out.println("client exceptions: ");
        printExceptions(client.getExceptions());
    }

    private static void printExceptions(Map<String, Integer> exceptions) {
        if (exceptions.isEmpty()) {
            System.out.println("There was no exceptions");
        }
        for (Entry<String, Integer> exception : exceptions.entrySet()) {
            System.out.println("There was " + exception.getValue() + " times this exception:");
            System.out.println(exception.getKey());
        }
    }

    public static class NettyServer {
        private ChannelFuture channelFuture;
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
        private AtomicInteger connections = new AtomicInteger(0);
        private ExceptionCounter exceptionCounter = new ExceptionCounter();

        public NettyServer(int port) throws InterruptedException {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler() {

                                @Override
                                public void channelActive(final ChannelHandlerContext ctx) {
                                    connections.incrementAndGet();
                                    super.channelActive(ctx);
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    exceptionCounter.countException(cause);
                                    super.exceptionCaught(ctx, cause);
                                }

                            });
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            channelFuture = serverBootstrap.bind(port).sync();
        }

        public int getPort() {
            return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
        }

        public int connectionsCount() {
            return connections.get();
        }

        public Map<String, Integer> getExceptions() {
            return exceptionCounter.getExceptions();
        }

        public void stop() {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            try {
                bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class NettyClient {
        private Bootstrap bootstrap;
        private EventLoopGroup workerGroup;
        private ExceptionCounter exceptionCounter = new ExceptionCounter();

        public NettyClient() {
            workerGroup = new NioEventLoopGroup();

            bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler() {
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                            exceptionCounter.countException(cause);
                            super.exceptionCaught(ctx, cause);
                        }
                    });
                }
            });
        }

        public Bootstrap getBootstrap() {
            return bootstrap;
        }

        public void stop() {
            workerGroup.shutdownGracefully();
            try {
                workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public Map<String, Integer> getExceptions() {
            return exceptionCounter.getExceptions();
        }
    }

    public static class TimeServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(final ChannelHandlerContext ctx) {
            final ByteBuf time = ctx.alloc().buffer(4);
            time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

            final ChannelFuture f = ctx.writeAndFlush(time);
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    ctx.close();
                }
            });
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }

    public static class TimeClientHandler extends ChannelInboundHandlerAdapter {
        private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>();

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            buf.set(ctx.alloc().buffer(4));
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            buf.get().release();
            buf.remove();
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            buf.get().writeBytes(m);
            m.release();

            if (buf.get().readableBytes() >= 4) {
                long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L;
                ctx.close();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }

    public static class ExceptionCounter {
        private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>();

        private void countException(Throwable cause) {

            StringWriter writer = new StringWriter();
            cause.printStackTrace(new PrintWriter(writer));
            String stackTrace = writer.toString();

            AtomicInteger exceptionCount = exceptions.get(stackTrace);
            if (exceptionCount == null) {
                exceptionCount = new AtomicInteger(0);
                AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount);
                if (prevCount != null) {
                    exceptionCount = prevCount;
                }
            }
            exceptionCount.incrementAndGet();
        }

        public Map<String, Integer> getExceptions() {
            Map<String, Integer> newMap = exceptions.entrySet().stream()
                    .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
            return Collections.unmodifiableMap(newMap);
        }
    }
}

The output is:

starting server
server started at port: 56069
staring client
client started
performing 131072 connections
client performed 131072 connections
wait a bit to give a chance for server to finish processing incoming requests
shutting down server and client
stopped, server received: 34735 connections
Where do we lost 96337 connections?
srerver exceptions: 
There was no exceptions
client exceptions: 
There was 258 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
    at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:745)

There was 30312 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:745)

Questions:

Environment:

Upvotes: 0

Views: 1713

Answers (1)

BrianS
BrianS

Reputation: 21

There is a limit of 64k ports per IP address, so you cannot open 2^17 ports. Since each socket uses a file handle, you might be hitting the limit of max open files per process. See "Max open files" for working process.

Upvotes: 1

Related Questions