GPI
GPI

Reputation: 9328

Unexpected closing of a Stream while concatenating them

Using Java 8 (if that matters), I have a behavior I struggle to understand.

Let's say I have an Entry class as such :

static class Entry {
    String key;
    List<String> values;
    public Entry(String key, String... values) {
        this.key = key;
        this.values = Arrays.asList(values);
    }
}

And a list of instances :

List<Entry> entries = Arrays.asList(
    new Entry("a", "a1"),
    new Entry("b", "b1"),
    new Entry("a", "a2"));
);

Now I want to collect all entries having the same key (and keep distinct values), and I stumbled upon a "IllegalStateException: stream has already been operated upon or closed".

The minimal code for producing it is :

entries.stream().collect(
  Collectors.groupingBy(
    e -> e.key,
    Collectors.mapping(
        e -> e.values.stream(), 
        Collectors.reducing(Stream.<String>empty(), Stream::concat))
  )
);

(I'd add a collectingAndThen to meet my requirement, but it's not the point of my question)

I fail to see which part of the code consumes / acts on the streams. Furthermore, if I change the code to the following, it works :

entries.stream().collect(
  Collectors.groupingBy(
    e -> e.key,
    Collectors.mapping(
        e -> e.values.stream(), 
        Collectors.reducing(Stream::concat))
  )
);

I'd rather use the former code, because the later gives me a Map<K, Optional<V>> while the former gives a Map<K, V>.

But the question is : what difference does the usage of a neutral element does in the reduction, that ultimately causes (at least) one of the stream to be consumed ?

Upvotes: 1

Views: 394

Answers (2)

ernest_k
ernest_k

Reputation: 45329

The main problem can be reduced to this similar example:

Stream<String> identity = Stream.empty();
Stream<String> stream1 = Stream.of("1");
Stream<String> stream2 = Stream.of("2");
Stream.concat(identity, stream1); //works
Stream.concat(identity, stream2); //java.lang.IllegalStateException

In other words,

Collectors.reducing(Stream.<String>empty(), Stream::concat)

Creates one stream object with Stream.<String>empty(), and reuses it as the identity value in your multi-level reduction. Fortunately, you already have a workaround.


As warned against in the docs, and also pointed out in comments, repeated stream concatenation is discouraged:

Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException.

One alternative approach I can think of is to flatten the stream before grouping:

//This yields a Map<String, List<String>>
entries.stream()
    .flatMap(v -> v.values.stream().map(val -> new SimpleEntry<>(v.key, val)))
    .collect(Collectors.groupingBy(
        Map.Entry::getKey, 
        Collectors.mapping(Map.Entry::getValue, 
                           Collectors.toList())));

Upvotes: 4

Brother
Brother

Reputation: 2210

The main problem is you cannot have a stream as identity element because streams cannot be reused, so when it tries to reuse it, throws saying it is operated upon or closed.

This is an alternative to the approach (returning List instead of Optional):

Map<String, List<String>> collect = entries.stream().collect(
    Collectors.groupingBy(
        e -> e.key,
        Collectors.flatMapping(e -> e.values.stream(), Collectors.toList())))

Upvotes: 1

Related Questions