Reputation: 34014
Suppose I'm trying to create a collector that aggregates data into a resource that has to be closed after usage. Is there any way to implement something similar to a finally
block in a Collector
? In the successful case this could be done in the finisher
method, but there does not seem to be any method invoked in case of exceptions.
The goal would be to implement an operation like the following in a clean way and without having to collect the stream into an in-memory list first.
stream.collect(groupingBy(this::extractFileName, collectToFile()));
Upvotes: 4
Views: 304
Reputation: 34460
The only way I think you could fulfil your requirement would be by means of a close handler supplied to the Stream.onClose
method. Suppose you have the following class:
class CloseHandler implements Runnable {
List<Runnable> children = new ArrayList<>();
void add(Runnable ch) { children.add(ch); }
@Override
public void run() { children.forEach(Runnable::run); }
}
Now, you'd need to use your stream as follows:
CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
// Now collect
stream.collect(Collectors.groupingBy(
this::extractFileName,
toFile(closeAll)));
}
This uses the try-with-resources
construct, so that the stream is automatically closed either when consumed or if an error occurs. Note that we're passing the closeAll
close handler to the Stream.onClose
method.
Here's a sketch of your downstream collector, which will collect/write/send elements to the Closeable
resource (note that we're also passing the closeAll
close handler to it):
static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {
class Acc {
SomeResource resource; // this is your closeable resource
Acc() {
try {
resource = new SomeResource(...); // create closeable resource
closeAll.add(this::close); // this::close is a Runnable
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
void add(Something elem) {
try {
// TODO write/send to closeable resource here
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
Acc merge(Acc another) {
// TODO left as an exercise
}
// This is the close handler for this particular closeable resource
private void close() {
try {
// Here we close our closeable resource
if (resource != null) resource.close();
} catch (IOException ignored) {
}
}
}
return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}
So, this uses a local class (named Acc
) to wrap the closeable resource, and declares methods to add
an element of the stream to the closeable resource, and also to merge
two Acc
instances in case the stream is parallel (left as an exercise, in case it's worth the effort).
Collector.of
is used to create a collector based on the Acc
class' methods, with a finisher that returns null
, as we don't want to put anything in the map created by Collectors.groupingBy
.
Finally, there's the close
method, which closes the wrapped closeable resource in case it has been created.
When the stream is implicitly closed by means of the try-with-resources
construct, the CloseHandler.run
method will be automatically executed, and this will in turn execute all the child close handlers previously added when each Acc
instance was created.
Upvotes: 1
Reputation: 2790
Ok I have took a look on Collectors
implementation, you need CollectorImpl
to create custom collector but its not public. So I implement new one using its copy (last 2 method you might be interested in):
public class CollectorUtils<T, A, R> implements Collector<T, A, R> {
static final Set<Collector.Characteristics> CH_ID = Collections
.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Function<A, R> finisher, Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BinaryOperator<A> combiner() {
return combiner;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
c.add(toFile(t));
}, (r1, r2) -> {
r1.addAll(r2);
return r1;
}, CH_ID);
}
private static File toFile(String fileName) {
try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
// stuff
System.out.println("Converting " + fileName);
return new File(fileName);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
throw new RuntimeException("Failed to create file");
}
}
Then I call stream as below:
public static void main(String[] args) {
Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
}
Output:
Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt
Upvotes: 0