RBA
RBA

Reputation: 12584

High concurrent client with NIO

I'm writing a tool that will generate a high amount of HTTP calls against a webserver. At this moment I'm interested on how many requests can I make per second. I'm not interested now of the result of those requests.

I'm measuring the time spent to send 1k requests against google.com and I get 69 milliseconds :

enter image description here

but when I'm sniffing the traffic with WireShark I see that sending all the GET requests is taking almost 4 seconds:

enter image description here

enter image description here

Tool has been run from IntelliJ on Windows 10, I7 1.8 Ghz, 32 GB of RAM.

My question is: why I have this difference? Sending 1k HTTP GET requests should be quick, but it takes almost 4 seconds. What I'm doing wrong here?

The code above is only for testing purposes and it's quite ugly, so bear with me. Also I'm not quite familiar with NIO.

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;


public class UJPPHighTrafficClient {
    public static final Logger logger = LoggerFactory.getLogger(UJPPHighTrafficClient.class);

    public static final int iterations = 1000;

    public static void main(String[] args) {
        doStartClient();
    }

    private static void doStartClient() {
        logger.info("starting the client");   

        UJPPHighTrafficExecutor executor = new UJPPHighTrafficExecutor();
           
        StopWatch watch = new StopWatch();
        watch.start();

        for (int i = 0; i < iterations; i++) {
            executor.run();
        }
        watch.stop();

        logger.info("Run " + iterations + " executions in " + watch.getTime() + " milliseconds");

    }
}


import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.nio.protocol.*;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicInteger;

public class UJPPHighTrafficExecutor {
    private final Logger logger = LoggerFactory.getLogger("debug");
    public static ConnectingIOReactor requestsReactor = null;
    private static BasicNIOConnPool clientConnectionPool = null;
    public static HttpAsyncRequester clientRequester = null;
    public static Thread runnerThread = null;
    private static AtomicInteger counter = null;

    public static final int cores = Runtime.getRuntime().availableProcessors() * 2;

    public UJPPHighTrafficExecutor() {
        counter = new AtomicInteger();
        counter.set(0);
        initializeConnectionManager();
    }

    public void initializeConnectionManager() {

        try {
            requestsReactor =
                    new DefaultConnectingIOReactor(IOReactorConfig.
                            custom().
                            setIoThreadCount(cores).
                            build());

            clientConnectionPool = new BasicNIOConnPool(requestsReactor, ConnectionConfig.DEFAULT);

            clientConnectionPool.setDefaultMaxPerRoute(cores);
            clientConnectionPool.setMaxTotal(100);

            clientRequester = initializeHttpClient(requestsReactor);

        } catch (IOReactorException ex) {
            logger.error(" initializeConnectionManager " + ex.getMessage());
        }
    }

    private HttpAsyncRequester initializeHttpClient(final ConnectingIOReactor ioReactor) {
        // Create HTTP protocol processing chain
        HttpProcessor httpproc = HttpProcessorBuilder.create()
                // Use standard client-side protocol interceptors
                .add(new RequestContent(true)).
                        add(new RequestTargetHost()).
                        add(new RequestConnControl())
                .add(new RequestExpectContinue(true)).
                        build();

        // Create HTTP requester
        HttpAsyncRequester requester = new HttpAsyncRequester(httpproc);
        // Create client-side HTTP protocol handler
        HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
        // Create client-side I/O event dispatch
        final IOEventDispatch ioEventDispatch =
                new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);

        // Run the I/O reactor in a separate thread
        runnerThread = new Thread("Client") {
            @Override
            public void run() {
                try {
                    ioReactor.execute(ioEventDispatch);
                } catch (InterruptedIOException ex) {
                    logger.error("Interrupted", ex);
                } catch (IOException e) {
                    logger.error("I/O error", e);
                } catch (Exception e) {
                    logger.error("Exception encountered in Client ", e.getMessage(), e);
                }
                logger.info("Client shutdown");
            }
        };
        runnerThread.start();

        return requester;
    }

    public void run() {
        HttpHost httpHost = new HttpHost("google.com", 80, "http");
        final HttpCoreContext coreContext = HttpCoreContext.create();
        ProtocolVersion ver = new ProtocolVersion("HTTP", 1, 1);
        BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("GET", "/", ver);

        clientRequester.execute(new BasicAsyncRequestProducer(httpHost, request), new BasicAsyncResponseConsumer(),
                clientConnectionPool, coreContext,
                // Handle HTTP response from a callback
                new FutureCallback<HttpResponse>() {

                    @Override
                    public void completed(final HttpResponse response) {
                        logger.info("Completed " + response.toString());
                        checkCounter();
                    }

                    @Override
                    public void failed(final Exception ex) {
                        logger.info("Failed " + ex.getMessage());
                        checkCounter();
                    }

                    @Override
                    public void cancelled() {
                        logger.info("Cancelled ");
                        checkCounter();
                    }
                });
    }

    private void checkCounter() {
        counter.set(counter.get() + 1);
        if (counter.get() == UJPPHighTrafficClient.iterations) {
            try {
                requestsReactor.shutdown();
            } catch (Exception ex) {

            }

        }
    }
}

Upvotes: 1

Views: 177

Answers (1)

DuncG
DuncG

Reputation: 15196

You code is timing how long it is to set up 1000 iterations of http connection, and not the time to complete those connections many of which are still running 3-4 seconds later. To see a more accurate figure put a local field t0 into UJPPHighTrafficExecutor:

public class UJPPHighTrafficExecutor {
    long t0 = System.nanoTime();

...and then checkCounter() can print a time for completing all iterations:

private void checkCounter() {
    counter.set(counter.get() + 1);
    if (counter.get() == UJPPHighTrafficClient.iterations) {
        try {
            requestsReactor.shutdown();
        } catch (Exception ex) {

        }
        long t1 = System.nanoTime();
        System.out.println("ELAPSED MILLIS: ~"+TimeUnit.NANOSECONDS.toMillis(t1-t0));
    }
}

This will print a much larger number for 1000 iterations:

ELAPSED MILLIS: ~xxxx

Note that counter.set(counter.get() + 1) is not safe way to increment AtomicInteger , remove the line and increment inside the if statement:

if (counter.incrementAndGet() == UJPPHighTrafficClient.iterations)

Upvotes: 1

Related Questions