Mykolas T
Mykolas T

Reputation: 33

Spliterator generated by Iterables.partition() doesn't behave as expected?

I've noticed that the spliterator produced by using Guava's Iterables.partition(collection, partitionSize).spliterator() behaves strange.

Executing trySplit() on the resultant spliterator doesn't split, but executing trySplit() on the result of the initial trySplit() finally does.

Furthermore, using StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator(), true) does not parallelize the the stream, but StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator().trySplit(), true) does parallelize and the resultant stream contains all of the partitions.

My goal is: given a collection with size 100k I want to partition it into batches of size 5000 and process those batches in parallel.

2 questions: does the spliterator generated by Iterables.partition behave correctly? Is my approach a good way to achieve my goal?

Upvotes: 3

Views: 1213

Answers (1)

Eugene
Eugene

Reputation: 120848

The problem here is that Spliterator comes from an Iterable, that does not have a known size. So the implementation internally will buffer the elements into a buffer of size 1024 and continue to increase the buffer on next iterations. What I mean by that is :

    List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
    Iterable<List<Integer>> it = Iterables.partition(coll, 1);
    Spliterator<List<Integer>> sp = it.spliterator();

    Spliterator<List<Integer>> one = sp.trySplit();
    System.out.println(one.getExactSizeIfKnown());

    Spliterator<List<Integer>> two = sp.trySplit();
    System.out.println(two.getExactSizeIfKnown());

    Spliterator<List<Integer>> three = sp.trySplit();
    System.out.println(three.getExactSizeIfKnown());

    Spliterator<List<Integer>> four = sp.trySplit();
    System.out.println(four.getExactSizeIfKnown());

which would print:

1024
2048
3072
4096

If you want to process 5000 elements at a time, you need to start with a Spliterator that has a known size to begin with. You could put those partitions to an ArrayList first:

 public static void main(String[] args) {

    List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
    Iterable<List<Integer>> it = Iterables.partition(coll, 5000);

    List<List<Integer>> list = new ArrayList<>();
    it.forEach(list::add);

    StreamSupport.stream(list.spliterator(), true)
            .map(x -> {
                System.out.println(
                        "Thread : " + Thread.currentThread().getName() +
                        " processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
                );
                return x;
            })
            .flatMap(List::stream)
            .collect(Collectors.toList());
}

On my machine it shows that they are processed by one thread each:

Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999

Upvotes: 1

Related Questions