Overflowsky
Overflowsky

Reputation: 33

Java parallel stream internals

I noticed that depends of implementation of doSth() method (if thread sleeps for a constant or random amount of time) a parallel stream is being executed differently.

Example:

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.lang.System.out;

public class AtomicInt {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        out.println("Result: " + count());
    }

    public static int count() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(10);

        AtomicInteger counter = new AtomicInteger(0);

        forkJoinPool.submit(() -> IntStream
                .rangeClosed(1, 20)
                .parallel()
                .map(i -> doSth(counter))
                .forEach(i -> out.println(">>>forEach: " + Thread.currentThread().getName() + " value: " + i))
        ).get();

        return counter.get();
    }

    private static int doSth(AtomicInteger counter) {
        try {
            out.println(">>doSth1: " + Thread.currentThread().getName());
            Thread.sleep(100 + new Random().nextInt(1000));
//            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        int counterValue = counter.incrementAndGet();
        out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);

        return counterValue;
    }
}

Each number is being processed in the order:

>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-10
>>doSth2: ForkJoinPool-1-worker-8 value: 1
>>>forEach: ForkJoinPool-1-worker-8 value: 1
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-15 value: 2
>>>forEach: ForkJoinPool-1-worker-15 value: 2
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-11 value: 3
>>>forEach: ForkJoinPool-1-worker-11 value: 3
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-2 value: 4
>>>forEach: ForkJoinPool-1-worker-2 value: 4
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-9 value: 5
>>>forEach: ForkJoinPool-1-worker-9 value: 5
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-11 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-1 value: 7
>>>forEach: ForkJoinPool-1-worker-1 value: 7
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 8
>>>forEach: ForkJoinPool-1-worker-15 value: 8
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-8 value: 9
>>>forEach: ForkJoinPool-1-worker-8 value: 9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-13 value: 10
>>>forEach: ForkJoinPool-1-worker-13 value: 10
>>doSth1: ForkJoinPool-1-worker-13
>>doSth2: ForkJoinPool-1-worker-9 value: 11
>>>forEach: ForkJoinPool-1-worker-9 value: 11
>>doSth2: ForkJoinPool-1-worker-15 value: 12
>>>forEach: ForkJoinPool-1-worker-15 value: 12
>>doSth2: ForkJoinPool-1-worker-10 value: 13
>>>forEach: ForkJoinPool-1-worker-10 value: 13
>>doSth2: ForkJoinPool-1-worker-4 value: 14
>>>forEach: ForkJoinPool-1-worker-4 value: 14
>>doSth2: ForkJoinPool-1-worker-6 value: 15
>>>forEach: ForkJoinPool-1-worker-6 value: 15
>>doSth2: ForkJoinPool-1-worker-11 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 16
>>doSth2: ForkJoinPool-1-worker-2 value: 17
>>>forEach: ForkJoinPool-1-worker-2 value: 17
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-13 value: 18
>>doSth2: ForkJoinPool-1-worker-1 value: 19
>>>forEach: ForkJoinPool-1-worker-1 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 20
>>>forEach: ForkJoinPool-1-worker-8 value: 20
Result: 20

When I change doSth() method to sleep always for 1s instead of a random time, then result is being computed out of sequence:

>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-1 value: 1
>>doSth2: ForkJoinPool-1-worker-10 value: 2
>>doSth2: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-10 value: 2
>>>forEach: ForkJoinPool-1-worker-1 value: 1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 4
>>doSth2: ForkJoinPool-1-worker-9 value: 10
>>doSth2: ForkJoinPool-1-worker-8 value: 7
>>>forEach: ForkJoinPool-1-worker-8 value: 7
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-4 value: 9
>>>forEach: ForkJoinPool-1-worker-4 value: 9
>>doSth2: ForkJoinPool-1-worker-11 value: 8
>>doSth2: ForkJoinPool-1-worker-13 value: 6
>>doSth2: ForkJoinPool-1-worker-2 value: 5
>>>forEach: ForkJoinPool-1-worker-13 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 8
>>doSth1: ForkJoinPool-1-worker-4
>>>forEach: ForkJoinPool-1-worker-9 value: 10
>>>forEach: ForkJoinPool-1-worker-15 value: 4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-13
>>>forEach: ForkJoinPool-1-worker-2 value: 5
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-10 value: 12
>>>forEach: ForkJoinPool-1-worker-10 value: 12
>>doSth2: ForkJoinPool-1-worker-6 value: 11
>>doSth2: ForkJoinPool-1-worker-1 value: 13
>>>forEach: ForkJoinPool-1-worker-6 value: 11
>>>forEach: ForkJoinPool-1-worker-1 value: 13
>>doSth2: ForkJoinPool-1-worker-9 value: 15
>>doSth2: ForkJoinPool-1-worker-2 value: 20
>>>forEach: ForkJoinPool-1-worker-2 value: 20
>>doSth2: ForkJoinPool-1-worker-15 value: 19
>>>forEach: ForkJoinPool-1-worker-15 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 14
>>doSth2: ForkJoinPool-1-worker-11 value: 17
>>doSth2: ForkJoinPool-1-worker-4 value: 16
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-4 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 17
>>>forEach: ForkJoinPool-1-worker-8 value: 14
>>>forEach: ForkJoinPool-1-worker-9 value: 15
>>>forEach: ForkJoinPool-1-worker-13 value: 18
Result: 20

Is it a coincidence or there is an explanation for this behaviour ?

Upvotes: 3

Views: 328

Answers (2)

Holger
Holger

Reputation: 298153

By the time, you are executing the sleep statement, there is no defined order. While the stream created via IntStream.range has a defined encounter order, you are turning the operation into an unordered one by ignoring the actual int value.

The first action that produces a perceivable order, is counter.incrementAndGet(). Before that, it didn’t matter which thread reached that point and which stream element it is associated with. It gets its number right at this point from the AtomicInteger. After that, only two additional actions using the number are performed, printing two messages using that number. All that matters for these two different results, is whether these three actions, counter.incrementAndGet() and printing these two messages, are intercepted by another thread.

We can easily strip this scenario down to

AtomicInteger counter = new AtomicInteger();
ExecutorService es = Executors.newFixedThreadPool(20);
es.invokeAll(Collections.nCopies(20, () -> {
    out.println("1st: " + Thread.currentThread().getName());
    Thread.sleep(100 + new Random().nextInt(1000));
//    Thread.sleep(1000);
    int counterValue = counter.incrementAndGet();
    out.println("2nd: " + Thread.currentThread().getName() + " value: " + counterValue);
    out.println("3rd: " + Thread.currentThread().getName() + " value: " + counterValue);
    return null;
}));
es.shutdown();

Note that for invokeAll, there is no defined ordering at all, but, as said, it doesn’t matter. The tasks get their sequence number assigned as late as when calling incrementAndGet(). The behavior is the same as with the stream example.


While I always emphasize that concurrent does not mean parallel, due to the unspecified execution times and thread scheduling behavior, there is still a good chance that short identical code started at the same time runs truly parallel when no background activity puts unpredictable workload on the CPU cores.

When all threads run in parallel, they hit the internal synchronization of out.println at the same time and only one thread can proceed and the others are put into the queue. Then, the unfair nature of synchronized comes into play. An arbitrary thread will win, afterwards, an arbitrary thread will be put back to schedule. This causes the numbers to be printed in random order.

When you let the threads sleep a random amount of time, they are not running exactly parallel anymore, raising the chance to reach the print statement at different times, being able to execute them uncontended. Which thread will reach this point first, is random, but since they get their numbers assigned after the sleep, the first thread reaching this point will get the number one, and so on.

Upvotes: 4

gil.fernandes
gil.fernandes

Reputation: 14611

I suspect the cause of this behaviour is in the call to nextInt in java.util.Random class (Java 8).

>     protected int next(int bits) {
>         long oldseed, nextseed;
>         AtomicLong seed = this.seed;
>         do {
>             oldseed = seed.get();
>             nextseed = (oldseed * multiplier + addend) & mask;
>         } while (!seed.compareAndSet(oldseed, nextseed));
>         return (int)(nextseed >>> (48 - bits));
>     }

If you do not execute java.util.Random.nextInt(int), but pre-compute the random values, the result is still out of sequence - or if you create your own Random class derived from java.util.Random and override the method protected int next(int bits) to return any constant integer. I have tried to pre-compute the random values, like so:

private static final int[] randomIntervals = IntStream.range(0, 20)
            .map(i -> new Random().nextInt(1000) + 100)
            .toArray();

And then used it in the doSth method:

private static int doSth(AtomicInteger counter) {
    try {
        Thread.sleep(randomIntervals[counter.intValue()]);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

    int counterValue = counter.incrementAndGet();
    out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);

    return counterValue;
}

The result looks similar to the version of the code which uses simply a Thread.sleep(1000) like:

>>doSth2: ForkJoinPool-1-worker-26 value: 8
>>doSth2: ForkJoinPool-1-worker-1 value: 4
>>>forEach: ForkJoinPool-1-worker-1 value: 4
>>doSth2: ForkJoinPool-1-worker-8 value: 7
>>>forEach: ForkJoinPool-1-worker-8 value: 7
>>doSth2: ForkJoinPool-1-worker-25 value: 1
>>>forEach: ForkJoinPool-1-worker-25 value: 1
>>doSth2: ForkJoinPool-1-worker-15 value: 6
>>>forEach: ForkJoinPool-1-worker-15 value: 6
>>doSth2: ForkJoinPool-1-worker-29 value: 5
>>>forEach: ForkJoinPool-1-worker-29 value: 5
>>doSth2: ForkJoinPool-1-worker-4 value: 2
>>>forEach: ForkJoinPool-1-worker-4 value: 2
>>doSth2: ForkJoinPool-1-worker-18 value: 3
>>>forEach: ForkJoinPool-1-worker-18 value: 3
>>>forEach: ForkJoinPool-1-worker-26 value: 8
>>doSth2: ForkJoinPool-1-worker-11 value: 9
>>>forEach: ForkJoinPool-1-worker-11 value: 9
>>doSth2: ForkJoinPool-1-worker-22 value: 10
>>>forEach: ForkJoinPool-1-worker-22 value: 10
>>doSth2: ForkJoinPool-1-worker-1 value: 11
>>doSth2: ForkJoinPool-1-worker-25 value: 12
>>>forEach: ForkJoinPool-1-worker-25 value: 12
>>doSth2: ForkJoinPool-1-worker-8 value: 13
>>>forEach: ForkJoinPool-1-worker-8 value: 13
>>>forEach: ForkJoinPool-1-worker-1 value: 11
>>doSth2: ForkJoinPool-1-worker-29 value: 14
>>>forEach: ForkJoinPool-1-worker-29 value: 14
>>doSth2: ForkJoinPool-1-worker-4 value: 15
>>>forEach: ForkJoinPool-1-worker-4 value: 15
>>doSth2: ForkJoinPool-1-worker-15 value: 16
>>>forEach: ForkJoinPool-1-worker-15 value: 16
>>doSth2: ForkJoinPool-1-worker-18 value: 17
>>>forEach: ForkJoinPool-1-worker-18 value: 17
>>doSth2: ForkJoinPool-1-worker-26 value: 18
>>>forEach: ForkJoinPool-1-worker-26 value: 18
>>doSth2: ForkJoinPool-1-worker-22 value: 19
>>>forEach: ForkJoinPool-1-worker-22 value: 19
>>doSth2: ForkJoinPool-1-worker-11 value: 20
>>>forEach: ForkJoinPool-1-worker-11 value: 20

Another experiment was to just call the constructor of java.util.Random, but not nextInt. Again the result is out of sequence:

private static int doSth(AtomicInteger counter) {
    try {
        Random r = new Random();
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

    int counterValue = counter.incrementAndGet();
    out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);

    return counterValue;
}

Upvotes: 1

Related Questions