Michael Kay
Michael Kay

Reputation: 163282

Is it possible to write a Java Collector that does early exit when it has a result?

Is it possible to implement a Collector that stops processing of the stream as soon as an answer is available?

For example, if the Collector is computing an average, and one of the values is NaN, I know the answer is going to be NaN without seeing any more values, so further computation is pointless.

Upvotes: 8

Views: 560

Answers (5)

benez
benez

Reputation: 2001

Instead of using a Collector, you could use Stream.allMatch(..) to terminate the Stream early and use the util classes like LongSummaryStatistics directly. If all values (and at least one) were present, you return them, e.g.:

Optional<LongSummaryStatistics> toLongStats(Stream<OptionalLong> stream) {
    LongSummaryStatistics stat = new LongSummaryStatistics();
    boolean allPresent = stream.allMatch(opt -> {
        if (opt.isEmpty()) return false;
        stat.accept(opt.getAsLong());
        return true;
    });
    return allPresent && stat.getCount() > 0 ? Optional.of(stat) : Optional.empty();
}

Instead of a Stream<OptionalLong> you might use a DoubleStream and check for your NaN case.

Upvotes: 1

Michael Kay
Michael Kay

Reputation: 163282

Thanks for the responses. The comments pointed the way to a solution, which I will describe here. It's very much inspired by StreamEx, but adapted to my particular situation.

Firstly, I define an implementation of Stream called XdmStream which in general delegates all methods to an underlying Stream which it wraps.

This immediately gives me the opportunity to define new methods, so for example my users can do stream.last() instead of stream.reduce((first,second)->second), which is a useful convenience.

As an example of a short-circuiting method I have implemented XdmStream.untilFirst(Predicate) as follows (base is the wrapped Stream). The idea of this method is to return a stream that delivers the same results as the original stream, except that when a predicate is satisfied, no more results are delivered.

public XdmStream<T> untilFirst(Predicate<? super XdmItem> predicate) {
    Stream<T> stoppable = base.peek(item -> {
        if (predicate.test(item)) {
            base.close();
        }
    });
    return new XdmStream<T>(stoppable);
}

When I first create the base Stream I call its onClose() method so that a call on close() triggers the supplier of data to stop supplying data.

The close() mechanism doesn't seem particularly well documented (it relies on the concept of a "stream pipeline" and it's not entirely clear when a new stream returned by some method is part of the same pipeline as the original stream) - but it's working for me. I guess I should probably ensure that this is only an optimization, so that the results will still be correct even if the flow of data isn't immediately turned off (e.g. if there is any buffering in the stream).

Upvotes: 2

ddinde
ddinde

Reputation: 96

Stream<String> s = Stream.of("1","2","ABC", "3");
    try
    {
        double result = s.collect(Collectors.averagingInt(n -> Integer.parseInt(n)));
        System.err.println("Average :"+ result);
    }
    catch (NumberFormatException e)
    {
        // exception will be thrown it encounters ABC and collector won't go for "3"
        e.printStackTrace();
    }

Upvotes: 0

Gonen I
Gonen I

Reputation: 6107

For the case of NaN, it might be acceptable to consider this an Exceptional outcome, and so throw a custom NaNAverageException, short circuiting the collection operation. Normally using exceptions for normal control flow is a bad practice, however, it may be justified in this case.

Upvotes: 0

Jacob G.
Jacob G.

Reputation: 29680

In addition to Federico's comment, it is possible to emulate a short-circuiting Collector by ceasing accumulation once a certain condition has been met. Though, this method will only be beneficial if accumulation is expensive. Here's an example, but keep in mind that there are flaws with this implementation:

public class AveragingCollector implements Collector<Double, double[], Double> {
    private final AtomicBoolean hasFoundNaN = new AtomicBoolean();

    @Override
    public Supplier<double[]> supplier() {
        return () -> new double[2];
    }

    @Override
    public BiConsumer<double[], Double> accumulator() {
        return (a, b) -> {
            if (hasFoundNaN.get()) {
                return;
            }

            if (b.equals(Double.NaN)) {
                hasFoundNaN.set(true);
                return;
            }

            a[0] += b;
            a[1]++;
        };
    }

    @Override
    public BinaryOperator<double[]> combiner() {
        return (a, b) -> {
            a[0] += b[0];
            a[1] += b[1];

            return a;
        };
    }

    @Override
    public Function<double[], Double> finisher() {
        return average -> average[0] / average[1];
    }

    @Override
    public Set<Characteristics> characteristics() {
        return new HashSet<>();
    }
}

The following use-case returns Double.NaN, as expected:

public static void main(String args[]) throws IOException {
    DoubleStream.of(1, 2, 3, 4, 5, 6, 7, Double.NaN)
                .boxed()
                .collect(new AveragingCollector()));
}

Upvotes: 1

Related Questions