Reputation: 7719
My code merges (flatmap, concatenate) multiple IO streams into a single stream. Does wrapping that stream with a resource-with-try
block properly close the underlying streams?
public ImmutableListMultiMap<Long, Long> find(long customerId, ImmutableList<Long> itemIds) {
Stream<List<Long>> chunks =
Streams.stream(Iterators.partition(itemIds, CHUNK_SIZE));
return Streams.mapWithIndex(
chunk,
(chunk, index) -> {
System.out.println("reading chunk " + index);
return ImmutableList.copyOf(chunk);
})
.flatMap(
chunk ->
findItems(customerId, chunk))
.collect(toImmutableListMultimap(Pair::getFirst, Pair::getSecond));
}
Stream<Pair<Long, Long>> findItems(Long customerId, ImmutableList<Long> itemIds) {
return Stream.concat(findBigItems(customerId, itemIds), findSmallItems(customerId, itemIds));
}
Stream<Pair<Long, Long>> findSmallItems(Long customerId, ImmutableList<Long> itemIds) {
// returns a stream from an IO read
}
Stream<Pair<Long, Long>> findBigItems(Long customerId, ImmutableList<Long> itemIds) {
// returns a stream from an IO read
}
Although there is no try-with-resource
explicit in the code, flatMap
should do the same (i.e. closes the resources automatically).
Upvotes: 0
Views: 136
Reputation: 15156
Yes, and as I hinted in the comments you only need to add a little logging to confirm:
.onClose(() -> System.out.println("onClose was called"))
Here is a contrived example stream:
static Stream<Integer> newStream() {
return IntStream.range(0,2).boxed()
.onClose(() -> System.out.println("onClose newStream"))
.flatMap(i -> Stream.of(i, i+1).onClose(() -> System.out.println("onClose flatMap Stream.of i="+i)));
}
If you run this stream with following, you can see when the "onClose" calls are used - or not:
System.out.println("TEST1 "+newStream().count());
System.out.println("===");
try(Stream<Integer> stream = newStream()) {
System.out.println("TEST2 "+stream.count());
}
System.out.println("===");
try(Stream<Integer> stream = Stream.concat(newStream(), newStream())
.onClose(() -> System.out.println("onClose Stream.concat"))) {
System.out.println("TEST3 "+stream.count());
}
TEST1 isn't inside try-with-resources and misses some of the onClose, but runs the flatMap streams onClose. TEST2/3 print all the onClose calls as expected:
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST1 4
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST2 4
onClose newStream
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST3 8
onClose newStream
onClose newStream
onClose Stream.concat
Upvotes: 2