Jörn Horstmann
Jörn Horstmann

Reputation: 34014

Java Collector with Closeable resource as accumulator

Suppose I'm trying to create a collector that aggregates data into a resource that has to be closed after usage. Is there any way to implement something similar to a finally block in a Collector? In the successful case this could be done in the finisher method, but there does not seem to be any method invoked in case of exceptions.

The goal would be to implement an operation like the following in a clean way and without having to collect the stream into an in-memory list first.

stream.collect(groupingBy(this::extractFileName, collectToFile()));

Upvotes: 4

Views: 304

Answers (2)

fps
fps

Reputation: 34460

The only way I think you could fulfil your requirement would be by means of a close handler supplied to the Stream.onClose method. Suppose you have the following class:

class CloseHandler implements Runnable {
    List<Runnable> children = new ArrayList<>();

    void add(Runnable ch) { children.add(ch); }

    @Override
    public void run() { children.forEach(Runnable::run); }
}

Now, you'd need to use your stream as follows:

CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
    // Now collect
    stream.collect(Collectors.groupingBy(
        this::extractFileName, 
        toFile(closeAll)));
}

This uses the try-with-resources construct, so that the stream is automatically closed either when consumed or if an error occurs. Note that we're passing the closeAll close handler to the Stream.onClose method.

Here's a sketch of your downstream collector, which will collect/write/send elements to the Closeable resource (note that we're also passing the closeAll close handler to it):

static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {

    class Acc {

        SomeResource resource; // this is your closeable resource

        Acc() {
            try {
                resource = new SomeResource(...); // create closeable resource
                closeAll.add(this::close);        // this::close is a Runnable
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        void add(Something elem) {
            try {
                // TODO write/send to closeable resource here
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        Acc merge(Acc another) {
            // TODO left as an exercise
        }

        // This is the close handler for this particular closeable resource
        private void close() {
            try {
                // Here we close our closeable resource
                if (resource != null) resource.close();
            } catch (IOException ignored) {
            }
        }
    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}

So, this uses a local class (named Acc) to wrap the closeable resource, and declares methods to add an element of the stream to the closeable resource, and also to merge two Acc instances in case the stream is parallel (left as an exercise, in case it's worth the effort).

Collector.of is used to create a collector based on the Acc class' methods, with a finisher that returns null, as we don't want to put anything in the map created by Collectors.groupingBy.

Finally, there's the close method, which closes the wrapped closeable resource in case it has been created.

When the stream is implicitly closed by means of the try-with-resources construct, the CloseHandler.run method will be automatically executed, and this will in turn execute all the child close handlers previously added when each Acc instance was created.

Upvotes: 1

HRgiger
HRgiger

Reputation: 2790

Ok I have took a look on Collectors implementation, you need CollectorImpl to create custom collector but its not public. So I implement new one using its copy (last 2 method you might be interested in):

public class CollectorUtils<T, A, R> implements Collector<T, A, R> {

    static final Set<Collector.Characteristics> CH_ID = Collections
            .unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));

    private final Supplier<A> supplier;
    private final BiConsumer<A, T> accumulator;
    private final BinaryOperator<A> combiner;
    private final Function<A, R> finisher;
    private final Set<Characteristics> characteristics;

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Function<A, R> finisher, Set<Characteristics> characteristics) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.combiner = combiner;
        this.finisher = finisher;
        this.characteristics = characteristics;
    }

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Set<Characteristics> characteristics) {
        this(supplier, accumulator, combiner, castingIdentity(), characteristics);
    }

    @Override
    public BiConsumer<A, T> accumulator() {
        return accumulator;
    }

    @Override
    public Supplier<A> supplier() {
        return supplier;
    }

    @Override
    public BinaryOperator<A> combiner() {
        return combiner;
    }

    @Override
    public Function<A, R> finisher() {
        return finisher;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return characteristics;
    }

    @SuppressWarnings("unchecked")
    private static <I, R> Function<I, R> castingIdentity() {
        return i -> (R) i;
    }

    public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
        return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
            c.add(toFile(t));
        }, (r1, r2) -> {
            r1.addAll(r2);
            return r1;
        }, CH_ID);
    }

    private static File toFile(String fileName) {
        try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
            // stuff
            System.out.println("Converting " + fileName);

            return new File(fileName);
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        throw new RuntimeException("Failed to create file");

    }

}

Then I call stream as below:

public static void main(String[] args) {
        Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
    }

Output:

Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt

Upvotes: 0

Related Questions