Arian
Arian

Reputation: 7719

Does `onClose` propagates to underlying streams when they are merged?

My code merges (flatmap, concatenate) multiple IO streams into a single stream. Does wrapping that stream with a resource-with-try block properly close the underlying streams?

public ImmutableListMultiMap<Long, Long> find(long customerId, ImmutableList<Long> itemIds) {
    Stream<List<Long>> chunks =
        Streams.stream(Iterators.partition(itemIds, CHUNK_SIZE));
    return Streams.mapWithIndex(
            chunk,
            (chunk, index) -> {
              System.out.println("reading chunk " + index);
              return ImmutableList.copyOf(chunk);
            })
        .flatMap(
            chunk ->
                findItems(customerId, chunk))
        .collect(toImmutableListMultimap(Pair::getFirst, Pair::getSecond));
}

Stream<Pair<Long, Long>> findItems(Long customerId, ImmutableList<Long> itemIds) {
 return Stream.concat(findBigItems(customerId, itemIds), findSmallItems(customerId, itemIds));
}

Stream<Pair<Long, Long>> findSmallItems(Long customerId, ImmutableList<Long> itemIds) {
 // returns a stream from an IO read
}

Stream<Pair<Long, Long>> findBigItems(Long customerId, ImmutableList<Long> itemIds) {
 // returns a stream from an IO read
}

Although there is no try-with-resource explicit in the code, flatMap should do the same (i.e. closes the resources automatically).

Upvotes: 0

Views: 136

Answers (1)

DuncG
DuncG

Reputation: 15156

Yes, and as I hinted in the comments you only need to add a little logging to confirm:

.onClose(() -> System.out.println("onClose was called"))

Here is a contrived example stream:

static Stream<Integer> newStream() {
    return IntStream.range(0,2).boxed()
            .onClose(() -> System.out.println("onClose newStream"))
            .flatMap(i -> Stream.of(i, i+1).onClose(() -> System.out.println("onClose flatMap Stream.of i="+i)));
}

If you run this stream with following, you can see when the "onClose" calls are used - or not:

System.out.println("TEST1 "+newStream().count());
System.out.println("===");

try(Stream<Integer> stream = newStream()) {
    System.out.println("TEST2 "+stream.count());
}
System.out.println("===");

try(Stream<Integer> stream = Stream.concat(newStream(), newStream())
            .onClose(() -> System.out.println("onClose Stream.concat"))) {
    System.out.println("TEST3 "+stream.count());
}

TEST1 isn't inside try-with-resources and misses some of the onClose, but runs the flatMap streams onClose. TEST2/3 print all the onClose calls as expected:

onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST1 4
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST2 4
onClose newStream
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST3 8
onClose newStream
onClose newStream
onClose Stream.concat

Upvotes: 2

Related Questions