user12340167
user12340167

Reputation:

threads in Java and computation

I am new to java, and I am trying to write a program that takes two parameters:

  1. the number until which we have to sum prime numbers
  2. the number of threads in which we have to do this

So I use a method named Eratosthene that stores an array of boolean and if a number is prime, we mark it true and after that we mark all the multiples of this number false.

I try to devide my array into sub arrays for each thread and do the operation in each sub arrays, and at the end sum all the results of sub arrays.

But I don't know where I am doing wrong: sometimes the program doesn't give the good result.

So here is my code:

SumPrime.java

import java.util.*;
import java.util.concurrent.*;

public class SumPrimes {

    private boolean array[];
    private int numberOfWorkers;
    private Semaphore allFinished;

    public SumPrimes(int num, int threads){
        array = new boolean[num];
        numberOfWorkers = threads;
        for (int i = 2; i < num; i++)
            array[i] = true;
    }

    private class SumParallel extends Thread {
        int min;
        int max;
        long sum;

        SumParallel(int min, int max){
            this.min = min;
            this.max = max;
            sum = 0;
        }

        public void run() {
            for (int i = min; i < max; i++) {
                if (array[i]) {
                    for (int j = min; j*i < array.length; j++) {
                        array[i*j] = false;
                    }
                    sum += i;
                }
            }
            allFinished.release();
        }

        public long getSum() {
            return sum;
        }
    }

    public void SumInParallel() {
        allFinished = new Semaphore(0);

        List<SumParallel> workers = new ArrayList<SumParallel>();
        int lengthOfOneWorker = array.length / numberOfWorkers;
        for (int i = 0; i < numberOfWorkers; i++) {
            int start = i * lengthOfOneWorker;
            int end = (i+1) * lengthOfOneWorker;

            if (i == numberOfWorkers - 1)
                end = array.length;
            SumParallel worker = new SumParallel(start, end);
            workers.add(worker);
            worker.start();
        }

        try {
            allFinished.acquire(numberOfWorkers);
        } catch (InterruptedException ignored) {}

        int sum = 0;
        for (SumParallel w : workers){
            sum += w.getSum();
        }

        System.out.println("The sum of prime numbers is: " + sum);
    }

    public static void main(String[] args) {
        int limitNum = Integer.parseInt(args[0]);
        int threadNum = Integer.parseInt(args[1]);
        SumPrimes sum_primes = new SumPrimes(limitNum, threadNum);
        sum_primes.SumInParallel();
    }
}

You can run the program like this:

java SumPrimes 1000 3

I am open to any suggestions for improving my code.

Upvotes: 1

Views: 1543

Answers (3)

Andreas
Andreas

Reputation: 159114

You need to entirely re-think the logic of your thread.

The various threads cannot access the same range of the array, e.g. if a thread has min = 100 and max = 150, then only elements in range 100 to 149 (inclusive) may be used and/or changed.

Your code:

for (int i = min; i < max; i++) {
    if (array[i]) {
        for (int j = min; j*i < array.length; j++) {
            array[i*j] = false;

starts with i = 100, j = 100 which makes i*j = 10000. If array was really that big, it means you access array[10000], but that is not allowed. Of course, the array isn't that big, so the code does nothing.

Ahh, you say, the first thread has min = 0 and max = 50, so it will change values from index 0 (0*0) up to 2401 (49*49), and since array is smaller than that, it will update the entire array, but that is not allowed.

Now, think about it again.

If the range is min = 100, max = 150, then you need to start by clearing all even numbers in that range, then all numbers divisible by 3, then all ... and so on, but only for that range.

I'll leave you to re-think the logic.


UPDATE

To apply Sieve of Eratosthenes to some range, we need the prime numbers up to the square root of the max of that range.

If the range is min = 150, max = 200, then maxPrime = sqrt(200) = 14, so we need the primes from 2 to 14 (inclusive), then we can update range 150-199.

Assuming we first update array to find all the primes in range 2-14, we can use that to iterate the multiples of those primes in the target range (150-199). For that we need to start at the lowest multiple of the prime that is >= min, so we need to round up min to the next multiple of prime.

With integer math, to round up to next multiple, we calculate:

lower = (min + prime - 1) / prime * prime

This gives us the main logic:

maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
    if (array[prime]) {
        int lower = (min + prime - 1) / prime * prime;
        for (int i = lower; i < max; i += prime)
            array[i] = false

We should also make each thread responsible for first setting all the booleans in the range, so that part becomes multi-threaded too.

The master logic now has to first find the primes in range 2-sqrt(N) in the main thread, then split the remaining range between the threads.

Here is my attempt:

public static long sumPrimes(int n, int threadCount) {
    // Find and sum the "seed" primes needed by the threads
    int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
    boolean[] seedPrime = new boolean[maxSeedPrime + 1];
    AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime, seedPrime, 0, maxSeedPrime));

    // Split remaining into ranges and start threads to calculate sums
    Thread[] threads = new Thread[threadCount];
    for (int t = 0, rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
        int min = rangeMin;
        int max = min + (n - min + 1) / (threadCount - t) - 1;
        threads[t] = new Thread(() ->
            totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
        );
        threads[t].start();
        rangeMin = max + 1;
    }

    // Wait for threads to end
    for (int t = 0; t < threadCount; t++) {
        try {
            threads[t].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // Return the calculated sum
    return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime, boolean[] rangePrime, int min, int max/*inclusive*/) {
    // Initialize range
    for (int i = Math.max(min, 2); i <= max; i++) {
        rangePrime[i - min] = true;
    }

    // Mark non-primes in range
    int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
    for (int prime = 2; prime <= maxPrime; prime++) {
        if (seedPrime[prime]) {
            int minMultiple = (min + prime - 1) / prime * prime;
            if (minMultiple <= prime)
                minMultiple = prime * 2;
            for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
                rangePrime[multiple - min] = false;
            }
        }
    }

    // Sum the primes
    long sum = 0;
    for (int prime = min; prime <= max; prime++) {
        if (rangePrime[prime - min]) {
            sum += prime;
        }
    }
    return sum;
}

Test

public static void main(String[] args) {
    test(1000, 3);
    test(100000000, 4);
}
public static void test(int n, int threadCount) {
    long start = System.nanoTime();
    long sum = sumPrimes(n, threadCount);
    long end = System.nanoTime();
    System.out.printf("sumPrimes(%,d, %d) = %,d (%.9f seconds)%n",
                      n, threadCount, sum, (end - start) / 1e9);
}

Output

sumPrimes(1,000, 3) = 76,127 (0.005595600 seconds)
sumPrimes(100,000,000, 4) = 279,209,790,387,276 (0.686881000 seconds)

UPDATE 2

The code above is using a lambda expression:

threads[t] = new Thread(() ->
    totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);

If you don't want to use a lambda expression, e.g. so it will run on Java 7, you can use an anonymous class instead:

threads[t] = new Thread() {
    @Override
    public void run() {
        totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max));
    }
};

Upvotes: 1

tevemadar
tevemadar

Reputation: 13195

Multi-threading usually also means that you want to make something faster. So first it may be worth to recap your initial design and make it faster on a single-thread. Then that is a goal to beat. Also, for comparing run time without writing refined benchmarks, you need a run time of "visible" length.
On my machine, with "setting"

int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end

your original code,

for(int i=2;i<max;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }

runs for 24-28 seconds. As discussed in the comments below @Andreas's post, and later inside (yeah, now I see it is accepted and most of the discussion is gone), the inner loop does a lot of extra checks (because it does one comparison all the time, even when it will not actually start). So the outer loop could be broken in two parts: first sieving and summing (up to the last "unknown" divisor of max, which is not more than its square root), and then just summing for the rest:

int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }
for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

This one runs for 14-16 seconds on my machine. Significant gain and no threads involved yet.

Then come the threads, and the issue with if(!sieve[i]): when calculating the sum, such check must not happen before the inner loop(s) for lower primes than i have surpassed i, so sieve[i] really tells if it is a prime or not. Because for example if a thread is running like for(int i=4;i<10001;i+=2)sieve[i]=true;, and another thread is checking sieve[10000] at the same time, it is still going to be false, and 10000 will be mistaken for a prime number.
A first attempt could be sieving on one thread (its outer loop "only" goes to square root of max anyway), and sum in parallel:

for(int i=2;i<=maxunique;i++)
    if(!sieve[i])
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;

int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
    long ii=i;
    Thread t=sumt[i]=new Thread(new Runnable() {
        public void run() {
            int from=(int)Math.max(ii*max/numt,2);
            int to=(int)Math.min((ii+1)*max/numt,max);
            long sum=0;
            for(int i=from;i<to;i++)
                if(!sieve[i])
                    sum+=i;
            sums[(int)ii]=sum;
        }
    });
    t.start();
}

for(int i=0;i<sumt.length;i++) {
    sumt[i].join();
    sum+=sums[i];
}

This is kinda neat, all threads (I have 4 cores) check the same amount of candidates, and the result is faster. Sometimes by almost a second, but mostly around half (~0.4 ... ~0.8 seconds). So this one is not really worth the effort, the sieving loops are the real time consuming parts here.

One can decide to allow redundant work, and start a thread for each prime-ish number encountered in the sieve, even if it is not an actual prime, just has not been ticked out yet:

List<Thread> threads=new ArrayList<>();
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        Thread t=new Thread(new Runnable() {
            public void run() {
                for(int j=ii*2;j<max;j+=ii)
                    sieve[j]=true;
            }
        });
        t.start();
        threads.add(t);
    }
//System.out.println(threads.size());
for(int i=0;i<threads.size();i++)
    threads.get(i).join();

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

The commented println() would tell (on my machine) that there were 3500-3700 threads created (while if someone puts a counter inside the original loops, it turns out that 3401 would be the minimum, that many primes are encountered in the single-threaded sieve-loop). While the overshoot is not catastrophic, the number of threads is pretty high, and the gain is not too stellar, though it is more visible than in the previous attempt: run time is 10-11 seconds (which of course could be lowered by half more seconds, via using the parallel sum loop).
One may address a bit of the redundant work by shutting down loops when they turn out to be filtering on a non-prime number:

for(int j=ii*2;j<max && !sieve[ii];j+=ii)

This one actually has some effect, resulting in 8.6-10.1 seconds run time for me.

As creating 3401 threads is not much less insane than creating 3700 of them, it may be a good idea to limit their numbers, and this is the point where it is easier to wave good bye to Threads. While technically it is possible to count them, there are various built-in infrastructure to do that for us.
Executors can help limiting the number of threads to a fixed amount (newFixedThreadPool()) or which is even better, to the number of CPUs available (newWorkStealingPool()):

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es);

int count=0;

for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        count++;
        ecs.submit(new Callable<Object>() {
            public Object call() throws Exception {
                // if(!sieve[ii])
                for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii)
                    sieve[j]=true;
                return null;
            }
        });
    }
System.out.println(count);
while(count-->0)
    ecs.take();
es.shutdown();
long sum=0;

for(int i=2;i<max;i++)
    if(!sieve[i])
        sum+=i;

This way it produces similar results to the previous one (8.6-10.5s). But, for low CPU count (4 cores) swapping the conditions results in some speedup (uncomment the if and comment the same condition in the loop, between /**/), because the tasks are running in their submission order, and thus most redundant loops can exit at the very beginning, making the repeated checks a waste of time. Then it is 8.5-9.3s for me, beating both the best and worst times of the direct threading attempt. However if you have a high CPU count (I ran it on a supercomputing node too with 32 cores available according to Runtime.availableProcessors()), the tasks will overlap more, and the non-tricked version (so the one always doing the check) is going to be faster.

And if you want a minor speedup, with rather good readability, you can parallelize the inner loop (which is possible with Threads too, just very tedious), using streams:

long sum=0;
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        sum+=i;
        int ii=i;
        IntStream.range(1, (max-1)/i).parallel().forEach(
            j -> sieve[ii+j*ii]=true);
    }

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

This one is very much like the original optimized loop-pair, and still has some speed, 9.4-10.0 seconds for me. So it is slower than the others (by ~10% or so), but it is far simpler.


Update:

  1. I fixed a series of off-by-one errors: xy<maxuniques are xy<=maxuniques now. While it un/fortunately did not affect the huge result, it did fail on such simple case as max=9 (when maxunique=3, and with a xy<3 loop, 9 would remain prime, and the sum was 26 instead of 17). Meh. Fixed the few continuation loops too (so they continue from maxunique+1 now).

  2. Creating an unbounded number of sub-tasks bothered me, and luckily found an inverted design, where we do not check for reaching sqrt(max) (which is maxunique), but instead we know that if we have finished sieving with numbers below a certain limit, we can continue checking numbers up to limit*limit, because whatever remained being prime inside the range (limit ... limit*limit) is really a prime (and we can still keep in mind that this upper limit is bounded by maxunique). And thus those can be sieved in parallel.

Base algorithm, just for checking (single threaded):

int limit=2;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            for(int j=i*2;j<max;j+=i)
                sieve[j]=true;
        }
    limit=upper;
} while(limit<=maxunique);

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

For some reason it is marginally slower than the original two-looped variant (13.8-14.5 seconds vs 13.7-14.0 seconds, min/max of 20 runs), but I was interested about parallelization anyway.
Probably because of the uneven distribution of prime numbers, using a parallel stream did not work well (I think it just pre-divides the work in seemingly equal pieces), but an Executor-based approach works well:

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es);

int limit=2;
int count=0;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            int ii=i;
            count++;
            ecs.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    for(int j=ii*2;j<max;j+=ii)
                        sieve[j]=true;
                    return null;
                }
            });
        }
    while(count>0) {
        count--;
        ecs.take();
    }
    limit=upper;
} while(limit<=maxunique);

es.shutdown();

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

For a low CPU-count environment, this is the fastest one so far (7.4-9.0 seconds vs. the 8.7-9.9 seconds of the "infinite number of threads" and the 8.5-9.2 seconds of the other Executor-based one). However, at the beginning it runs a low number of parallel tasks (when limit=2, it starts two parallel loops only, for 2 and 3), and on top of that, those are the longest running loops (with the smallest steps), and because of that in a high CPU-count environment it is just second place behind the original Executor-based one, 2.9-3.6 seconds vs. 2.7-3.2 seconds).
One can of course implement a separate ramp-up for the start, explicitly collecting the necessary number of primes to saturate the available cores, and later switch to this limit-based approach, and then the result may beat the other ones regardless of the number of cores. However I think I can resist the temptation for now.

Upvotes: 1

Joseph Larson
Joseph Larson

Reputation: 9058

I think your problem is this code:

   public void run() {
        for (int i = min; i < max; i++) {
            if (array[i]) {
                for (int j = min; j*i < array.length; j++) {
                    array[i*j] = false;
                }
                sum += i;
            }
        }
        allFinished.release();
    }

Imagine one of your later threads, working near the end of the list. The first item is NOT prime, but the work to identify it not being prime hasn't made it yet -- it's from a different thread, and that thread has barely started. So you believe the value is prime (it's not marked not-prime yet) and work accordingly.

If you provide an example that produces bad results, we can test the theory easily enough.

Upvotes: 0

Related Questions