Utku Ufuk
Utku Ufuk

Reputation: 350

Which is faster? Less work in more runnables, or more work in less runnables? (ExecutorService)

I'm trying to figure out how I can get the maximum performance from a multithreaded app.
I have a thread pool which I created like this:

ExecutorService executor = Executors.newFixedThreadPool(8); // I have 8 CPU cores.  

My question is, should I divide the work into only 8 runnables/callables, which is the same number as the threads in the thread pool, or should I divide it into say 1000000 runnables/callables?

for (int i = 0; i < 1000000; i++) 
{
    Callable<Long> worker = new MyCallable();  // Each worker does little work.
    Future<Long> submit = executor.submit(worker);
}

long sum = 0;

for (Future<Long> future : list) 
    sum += future.get();  // Much more overhead from the for loops

OR

for (int i = 0; i < 8; i++) 
{
    Callable<Long> worker = new MyCallable();  // Each worker does much more work.
    Future<Long> submit = executor.submit(worker);
}

long sum = 0;

for (Future<Long> future : list) 
    sum += future.get();  // Negligible overhead from the for loops

Dividing into 1000000 callables seems slower to me since there is the overhead of instantiating all these callables and collecting results from them in for loops. On the other hand If I have 8 callables this overhead is negligible. And since I have only 8 threads, I can't run 1000000 callables at the same time so there is no performance gain from there.

Am I right or wrong?

BTW I could test these cases but the operation is very trivial and I guess the compiler realizes that and makes some optimizations. So the result might be misleading. I want to know which approach is better for something like an image processing app.

Upvotes: 2

Views: 728

Answers (3)

Ioannis Deligiannis
Ioannis Deligiannis

Reputation: 2719

There are two aspects to this question.

First you have the technical Java stuff. As you have a few answers about this, I 'll summarize to these basics:

  • if you have N Cores, then N number of threads would give you the best results as long as each task is only CPU bound (i.e. no I/O involved)
  • each Thread should do more work than what is required for the task, i.e. Having N Threads counting to 10 would be much slower as the overhead of creating and managing the extra Threads is higher than the benefit of counting to 10 in parallel
  • you need to make sure that any synchronization overhead is lower than the work being done i.e. Having N Threads calling a synchronized increment methods would be much slower
  • Threads do take up resources, most commonly memory. The more threads you have, the more difficult it becomes to estimate you memory usage and might affect GC timing (rare but I've seen it happen)

Secondly you have the scheduling theory. You need to consider what is your program doing

  • Typically use Threads for blocking I/O operations. You don't want you program to wait for network or HDD if you could be using your CPU for other tasks
  • There are a few good books on scheduling (can't remember the names) that can help you design efficient programs. In the example you mention, there might be cases that extra threads would make sense. e.g. If your tasks don't have a deterministic duration, are skewed and your average response time is important: Assume you have 2 core and 4 tasks. Task A & B will take 1 minute each but C & D will take 10 minutes. If you run run these against 2 threads with C & D executing first, your total time will be 11 minutes but your average response time will be (10+10+11+11)/4=10.5 minutes. If you execute against 4 Threads then your the response time will be ((1+a)+(1+a)+(10+a)+(10+a))/4=5.5+a, where a is the scheduling waiting time approximation. This is very theoretical because there are many variables not explained, but can help in designing threaded programs. (Also in the example above, since you are waiting on the Futures you most likely don't care about average response times)
  • Care must be taken when using multiple Thread pools. Using multiple pools can cause deadlocks (if dependencies are introduced among the two pools) and make it hard to optimize (contention can be created among the pools and getting the sizes right might become impossible)

--EDIT--

Finally, if it helps, the way I think about performance is that I have 4 primary resources: CPU, RAM, Disk & Network. I try to find which is my bottleneck and use non-saturated resources to optimize. For example, if I have lots of idle CPU and low memory, I might compress my in-memory data. If I have lots of disk I/O and large memory, cache more data. If network resources (not the actual network connection) are slow use many threads to parallelize. Once you saturate a resource type on your critical path and can't use other resources to speed it up, you've reached your maximum performance and you need to upgrade your H/W to get faster results.

Upvotes: 4

Ralf H
Ralf H

Reputation: 1474

maybe this code helps. It will compute fibonacci numbers using a fork-join pool. With fork-join we can recursively subdivide a problem and combine the results of each recursion level. Threoretically, we could recurse down to fib(0) in the fork-join pool, but this would be inefficient. Therefore, we introduce a recursion limit where we stop subdividing the task and compute the rest in the current task. This code will record the time taken for fib(x) and compute the single-threaded time for each fib(n) for n up to x. For each recursion limit, it will measure how many tasks were created and how long each ran, on average.

Usually, the sweet spot is a task sizes above 1µs, but then our simple fibonacci tasks here need almost no memory/cache. For more data-intensive tasks with higher cache pollution, the switch is more expensive and concurrent tasks may pollute shared caches.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FibonacciFork extends RecursiveTask<Long> {

    private static final long serialVersionUID = 1L;

    public FibonacciFork( long n) {
        super();
        this.n = n;
    }

    static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors());

    static long fibonacci0( long n) {
        if ( n < 2) {
            return n;
        }
        return fibonacci0( n - 1) + fibonacci0( n - 2);
    }

    static int  rekLimit = 8;
    private static long stealCount;
    long    n;
    private long forkCount;
    private static AtomicLong forks = new AtomicLong( 0);

    static class Result {
        long    durMS;
        int rekLimit;
    }

    public static void main( String[] args) {

        int fiboArg = 49;
        BenchLogger.sysinfo( "Warmup");
        long    singleNS[] = getSingleThreadNanos( 20, 5e9);
        BenchLogger.sysinfo( "Warmup complete");
        singleNS = getSingleThreadNanos( fiboArg, 1e9);
        BenchLogger.sysinfo( "Single Thread Times complete");
        Result[] results = new Result[ fiboArg + 1];
        for ( int rekLimit = 2;  rekLimit <= fiboArg;  rekLimit++) {
            results[ rekLimit] = new Result();
            runWithRecursionLimit( rekLimit, fiboArg, singleNS[ rekLimit], results[ rekLimit]);
        }
        System.out.println( "CSV results for Fibo " + fiboArg + "\n" + "RekLimit\t" + "Jobs ns\t" + "time ms");
        for ( int rekLimit = 2;  rekLimit <= fiboArg;  rekLimit++) {
            System.out.println( rekLimit + "\t" + singleNS[ rekLimit] + "\t" + results[ rekLimit].durMS);
        }
    }

    private static long[] getSingleThreadNanos( final int n, final double minRuntimeNS) {
        final long timesNS[] = new long[ n + 1];
        ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 8));
        for ( int i = 2;  i <= n;  i++) {
            final int arg = i;
            Runnable runner = new Runnable() {
                @Override
                public void run() {
                    long    start = System.nanoTime();
                    long result = fibonacci0( arg);
                    long    end = System.nanoTime();
                    double  durNS = end - start;
                    long        ntimes = 1;
                    double fact = 1;
                    while ( durNS < minRuntimeNS) {
                        long    oldNTimes = ntimes;
                        if ( durNS > 0) {
                            ntimes = Math.max( 1, ( long) ( oldNTimes * fact * minRuntimeNS / durNS));
                        } else {
                            ntimes *= 2;
                        }
                        start = System.nanoTime();
                        for ( long i = 0;  i < ntimes;  i++) {
                            result = fibonacci0( arg);
                        }
                        end = System.nanoTime();
                        durNS = end - start;
                        fact *= 1.1;
                    }
                    timesNS[ arg] = ( long) ( durNS / ntimes);
                    System.out.println( "Single Fib(" + arg + ")=" + result + " in " + ( timesNS[ arg] / 1e6) + "ms (" + ntimes + " loops in " + (durNS / 1e6)
                            + " ms)");
                }
            };
            es.execute( runner);
        }
        es.shutdown();
        try {
            es.awaitTermination( 1, TimeUnit.HOURS);
        } catch ( InterruptedException e) {
            BenchLogger.sysinfo( "Single Timeout");
        }
        return timesNS;
    }

    private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos, Result result) {
        rekLimit = r;
        long    start = System.currentTimeMillis();
        long    fiboResult = fibonacci( arg);
        long    end = System.currentTimeMillis();
        // Steals zählen
        long    currentSteals = fjp.getStealCount();
        long    newSteals = currentSteals - stealCount;
        stealCount = currentSteals;
        long    forksCount = forks.getAndSet( 0);
        final long durMS = end-start;
        System.out.println( "Fib(" + arg + ")=" + fiboResult + " in " + durMS + "ms, recursion limit: " + r +
                " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount);
        result.durMS = durMS;
        result.rekLimit = r;
    }

    static long fibonacci( final long arg) {
        FibonacciFork   task = new FibonacciFork( arg);
        long result = fjp.invoke( task);
        forks.set( task.forkCount);
        return result;
    }

    @Override
    protected Long compute() {
        if ( n <= rekLimit) {
            return fibonacci0( n);
        }
        FibonacciFork   ff1 = new FibonacciFork( n-1);
        FibonacciFork   ff2 = new FibonacciFork( n-2);
        ff1.fork();
        long    r2 = ff2.compute();
        long    r1 = ff1.join();
        forkCount = ff2.forkCount + ff1.forkCount + 1;
        return r1 + r2;
    }
}

Upvotes: 1

Nachiket Kate
Nachiket Kate

Reputation: 8571

There is no straightforward answer to this question because it depends on lot of things like your code, application loigc, max, concurrency possible, hw etc.

But while considering concurrency you should consider below things,

  1. Every runnable needs a stack which is private for that thread thus if you create large no. of threads memory consumption in thread is more than actual application usage

  2. Thread should perform task which are independent and parallel.

    Find out code patch which can be actually executed in parallel without any dependency otherwise threading will not help much

  3. What is hardware configuration?

    Maximum concurrent execution of threads you can achieve is equal total no. of cpu cores. If you have less no. of cores and huge no. of threads then switching task is more active (use cpu) than actual thread. This can badly hamper performance

All in all your second approach looks good to me but if possible find out more parallelism and you can extend it upto 20-30.

Upvotes: 5

Related Questions