Scott
Scott

Reputation: 17057

Create thousands of Netty clients without also creating thousands of threads

I have created a fairly straight forward server using Netty 4. I have been able to scale it up to handle several thousand connections and it never climbs above ~40 threads.

number of server threads remains constant (via Java VisualVM)

In order to test it out, I have also created a test client that creates thousands of connections. Unfortunately this creates as many threads as it makes connections. I was hoping to minimize threads for the clients. I have looked at many posts for this. Many examples show single connection setup. This and this say to share NioEventLoopGroup across clients, which I do. I'm getting a limited number of nioEventLoopGroup, but getting a thread per connection elsewhere. I am not purposely creating threads in the pipeline and don't see what could be.

number of client threads grows with number of connections (via Java VisualVM)

threads (via Java VisualVM)

Here is a snippet from the setup of my client code. It seems that it should maintain a fixed thread count based on what I've researched so far. Is there something I'm missing that I should be doing to prevent a thread per client connection?

Main

final EventLoopGroup group = new NioEventLoopGroup();

for (int i=0; i<100; i++)){
    MockClient client = new MockClient(i, group);
    client.connect();
}

MockClient

public class MockClient implements Runnable {

    private final EventLoopGroup group;

    private int identity;

    public MockClient(int identity, final EventLoopGroup group) {
        this.identity = identity;
        this.group = group;
    }

    @Override
    public void run() {
        try {
            connect();
        } catch (Exception e) {}
    }

    public void connect() throws Exception{

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new MockClientInitializer(identity, this));

        final Runnable that = this;
        // Start the connection attempt
        b.connect(config.getHost(), config.getPort()).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    Channel ch = future.sync().channel();
                } else {
                    //if the server is down, try again in a few seconds
                    future.channel().eventLoop().schedule(that, 15, TimeUnit.SECONDS); 
                }
            }
        });
    }
}

Upvotes: 0

Views: 1356

Answers (1)

Scott
Scott

Reputation: 17057

As has happened to me many times before, explaining the problem in detail made me think about it more and I came across the issue. I wanted to provide it here should anyone else come across the same issue with creating thousands of Netty clients.

I have one path in my pipeline that will create a timeout task to simulate a client connection rebooting. It turns out it was this timer task that was creating the extra threads per connection whenever it received a 'reboot' signal from the server (which happens every so often) up until there was a thread per connection.

Handler

private final HashedWheelTimer timer;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {

    Packet packet = reboot();

    ChannelFutureListener closeHandler = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            RebootTimeoutTask timeoutTask = new RebootTimeoutTask(identity, client);
            timer.newTimeout(timeoutTask, SECONDS_FOR_REBOOT, TimeUnit.SECONDS);
        }
    };

    ctx.writeAndFlush(packet).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                future.channel().close().addListener(closeHandler);
            } else {
                future.channel().close();
            }
        }
    });

}

Timeout Task

public class RebootTimeoutTask implements TimerTask {

    public RebootTimeoutTask(...) {...}

    @Override
    public void run(Timeout timeout) throws Exception {
        client.connect(identity);
    }

}

Upvotes: 1

Related Questions