Martin
Martin

Reputation: 677

Why do I see performance drops on Netty server bootstrap accepting new channels?

I am still using Netty 3.10. I wrote an unit test to check on performance of Netty boss thread. I use a simple Netty server bootstrap within the unit test main thread and spawn 100 Java sync-IO clients within a cached thread pool. I noticed performance drops that I think are strange. Every client opens a socket, writes data and closes, logging duration (ms) after close. My unit test is attached. Typical output of my unit test is, in given order:

  1. 43 x Client done. Duration: 0
  2. 26 x Client done. Duration: 16
  3. 16 x Client done. Duration: 0
  4. 3 x Client done. Duration: 517
  5. 11 x Client done. Duration: 3003
  6. 1 x Client done. Duration: 6036

So there was 1 client that had to wait for 6 seconds to get an open TCP/IP channel and 11 clients that had to wait for 3 seconds. I also checked where the time is spent/lost. It is always new Socket(String,int) at the client side. At server side the time is already gone when pipeline factory gets triggered.

Is the threading model of my unit test the cause of this or really Netty bootstrap/boss?

import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class NettyServerBossTest {

    private static final String SRV_HOST = "localhost";
    private static final int SRV_PORT = 8080;
    private static final byte[] MSG = "Hello world!".getBytes(Charset.forName("UTF-8"));
    private static final int WAIT_MAX_MILLIS = 10 * 1000;

    private final ChannelGroup channels = new DefaultChannelGroup();

    private final int expected = 100;
    private final AtomicInteger actual = new AtomicInteger();

    private volatile boolean failed;
    private ExecutorService clientThreads;
    private Throwable cause;
    private ServerBootstrap bootstrap;

    @Test
    public void test() {
        createServer();
        createClients();
        awaitClients();
        verifyFailure();
    }

    private void awaitClients() {
        final long startMillis = System.currentTimeMillis();
        final long maxMillis = startMillis + WAIT_MAX_MILLIS;
        while ((this.actual.get() < this.expected) && !isFailed() && (System.currentTimeMillis() < maxMillis)) {
            try {
                Thread.sleep(250L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Total duration: " + (System.currentTimeMillis() - startMillis));
        Assert.assertEquals(this.expected, this.actual.get());
    }

    private void createClients() {
        this.clientThreads = Executors.newCachedThreadPool();
        for (int i = 0; i < this.expected; i++) {
            this.clientThreads.execute(new PlainSocketClient());
        }
    }

    private void closeChannels() {
        try {
            this.channels.close().await(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void createServer() {
        final ExecutorService bosses = Executors.newCachedThreadPool();
        final ExecutorService workers = Executors.newCachedThreadPool();
        final ChannelFactory factory = new NioServerSocketChannelFactory(bosses, workers);
        this.bootstrap = new ServerBootstrap(factory);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(new DiscardServerHandler());
            }
        });
        this.bootstrap.setOption("child.tcpNoDelay", Boolean.TRUE);
        this.bootstrap.setOption("child.keepAlive", Boolean.TRUE);
        this.bootstrap.bind(new InetSocketAddress(SRV_HOST, SRV_PORT));
    }

    /**
     * Fail unit test
     *
     * @param cause
     *            cause of failure
     */
    public synchronized void setCause(Throwable cause) {
        if (!this.failed && (cause == null)) {
            this.failed = true;
            this.cause = cause;
        }
    }

    @After
    public void after() {
        closeChannels();
        if (this.clientThreads != null) {
            this.clientThreads.shutdownNow();
        }
        if (this.bootstrap != null) {
            this.bootstrap.releaseExternalResources();
        }
    }

    /**
     * Check if unit test has failed
     *
     * @return <code>true</code> if failed, <code>false</code> if still OK
     */
    public boolean isFailed() {
        return this.failed;
    }

    /**
     * Get cause of failure
     *
     * @return cause or <code>null</code>
     */
    public synchronized Throwable getCause() {
        return this.cause;
    }

    /**
     * Make sure test has not failed with exception
     */
    public void verifyFailure() {
        if (this.failed) {
            throw new IllegalStateException("test failed", getCause());
        }
    }

public abstract class TestRunnable implements Runnable {

    @Override
    public final void run() {
        try {
            execute();
        } catch (Exception e) {
            handleException(e);
        }
    }

    protected abstract void handleException(Throwable e);

    protected abstract void execute() throws Exception;

}

public abstract class AsyncThreadsTestRunnable extends TestRunnable {

    @Override
    protected final void handleException(Throwable e) {
        setCause(e);
    }

}

public class PlainSocketClient extends AsyncThreadsTestRunnable {

    @Override
    protected void execute() throws Exception {
        final long startMillis = System.currentTimeMillis();
        try (Socket sock = new Socket(SRV_HOST, SRV_PORT)) {
            sock.getOutputStream().write(MSG);
        }
        NettyServerBossTest.this.actual.incrementAndGet();
        System.out.println("Client done. Duration: " + (System.currentTimeMillis() - startMillis));
    }

}

public class DiscardServerHandler extends SimpleChannelHandler {

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
        NettyServerBossTest.this.channels.add(e.getChannel());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        // discard
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();

        Channel ch = e.getChannel();
        ch.close();
    }
}

}

Upvotes: 4

Views: 758

Answers (1)

andy
andy

Reputation: 1356

I think the time you logged does not all spend by open socket, it spend by thread switching because when Thread A open a socket then CPU may switching to Thread B, then when socket open is done, CPU maybe does not switch to Thread A at once but after executing many other threads. I have change your PlainSocketClient to add an synchronized to make sure there are less thread switching affect:

public class PlainSocketClient extends AsyncThreadsTestRunnable {
    private static final String LOCK = "LOCK";

    @Override
    protected void execute() throws Exception {
        synchronized (LOCK) {
            final long startMillis = System.currentTimeMillis();
            try (Socket sock = new Socket(SRV_HOST, SRV_PORT)) {
                sock.getOutputStream().write(MSG);
            }
            NettyServerBossTest.this.actual.incrementAndGet();
            System.out.println("Client done. Duration: " + (System.currentTimeMillis() - startMillis));
        }
    }
}

then they almost only output 0 or 1. You can have a testing yourself. It just proof the time consuming at thread switching, not mean you need to add an synchronized in your code.

Upvotes: 1

Related Questions