Reputation: 163282
Is it possible to implement a Collector that stops processing of the stream as soon as an answer is available?
For example, if the Collector is computing an average, and one of the values is NaN, I know the answer is going to be NaN without seeing any more values, so further computation is pointless.
Upvotes: 8
Views: 560
Reputation: 2001
Instead of using a Collector
, you could use Stream.allMatch(..)
to terminate the Stream
early and use the util classes like LongSummaryStatistics
directly. If all values (and at least one) were present, you return them, e.g.:
Optional<LongSummaryStatistics> toLongStats(Stream<OptionalLong> stream) {
LongSummaryStatistics stat = new LongSummaryStatistics();
boolean allPresent = stream.allMatch(opt -> {
if (opt.isEmpty()) return false;
stat.accept(opt.getAsLong());
return true;
});
return allPresent && stat.getCount() > 0 ? Optional.of(stat) : Optional.empty();
}
Instead of a Stream<OptionalLong>
you might use a DoubleStream
and check for your NaN case.
Upvotes: 1
Reputation: 163282
Thanks for the responses. The comments pointed the way to a solution, which I will describe here. It's very much inspired by StreamEx, but adapted to my particular situation.
Firstly, I define an implementation of Stream called XdmStream which in general delegates all methods to an underlying Stream which it wraps.
This immediately gives me the opportunity to define new methods, so for example my users can do stream.last()
instead of stream.reduce((first,second)->second)
, which is a useful convenience.
As an example of a short-circuiting method I have implemented XdmStream.untilFirst(Predicate)
as follows (base
is the wrapped Stream). The idea of this method is to return a stream that delivers the same results as the original stream, except that when a predicate is satisfied, no more results are delivered.
public XdmStream<T> untilFirst(Predicate<? super XdmItem> predicate) {
Stream<T> stoppable = base.peek(item -> {
if (predicate.test(item)) {
base.close();
}
});
return new XdmStream<T>(stoppable);
}
When I first create the base Stream I call its onClose()
method so that a call on close() triggers the supplier of data to stop supplying data.
The close()
mechanism doesn't seem particularly well documented (it relies on the concept of a "stream pipeline" and it's not entirely clear when a new stream returned by some method is part of the same pipeline as the original stream) - but it's working for me. I guess I should probably ensure that this is only an optimization, so that the results will still be correct even if the flow of data isn't immediately turned off (e.g. if there is any buffering in the stream).
Upvotes: 2
Reputation: 96
Stream<String> s = Stream.of("1","2","ABC", "3");
try
{
double result = s.collect(Collectors.averagingInt(n -> Integer.parseInt(n)));
System.err.println("Average :"+ result);
}
catch (NumberFormatException e)
{
// exception will be thrown it encounters ABC and collector won't go for "3"
e.printStackTrace();
}
Upvotes: 0
Reputation: 6107
For the case of NaN, it might be acceptable to consider this an Exceptional outcome, and so throw a custom NaNAverageException, short circuiting the collection operation. Normally using exceptions for normal control flow is a bad practice, however, it may be justified in this case.
Upvotes: 0
Reputation: 29680
In addition to Federico's comment, it is possible to emulate a short-circuiting Collector
by ceasing accumulation once a certain condition has been met. Though, this method will only be beneficial if accumulation is expensive. Here's an example, but keep in mind that there are flaws with this implementation:
public class AveragingCollector implements Collector<Double, double[], Double> {
private final AtomicBoolean hasFoundNaN = new AtomicBoolean();
@Override
public Supplier<double[]> supplier() {
return () -> new double[2];
}
@Override
public BiConsumer<double[], Double> accumulator() {
return (a, b) -> {
if (hasFoundNaN.get()) {
return;
}
if (b.equals(Double.NaN)) {
hasFoundNaN.set(true);
return;
}
a[0] += b;
a[1]++;
};
}
@Override
public BinaryOperator<double[]> combiner() {
return (a, b) -> {
a[0] += b[0];
a[1] += b[1];
return a;
};
}
@Override
public Function<double[], Double> finisher() {
return average -> average[0] / average[1];
}
@Override
public Set<Characteristics> characteristics() {
return new HashSet<>();
}
}
The following use-case returns Double.NaN
, as expected:
public static void main(String args[]) throws IOException {
DoubleStream.of(1, 2, 3, 4, 5, 6, 7, Double.NaN)
.boxed()
.collect(new AveragingCollector()));
}
Upvotes: 1