Derrops
Derrops

Reputation: 8127

Stream iterate not using last value

Simplified Example

I have the following code which generates the sum of a series i.e. 1, 1+2, 1+2+3, 1+2+3+4

public static void main(String[] args) {

    Stream<Integer> inputStream = Stream.of(1,2,3,4);
    Iterator<Integer> iterator = inputStream.iterator();
    
    Stream<Integer> outputStream = Stream.iterate(
            iterator.next(),
            i -> iterator.hasNext(),
            next -> {
                return iterator.next() + next;
            }
    );

    List<Integer> outputList = outputStream.collect(Collectors.toList());
    System.out.println(outputList);

}

But this prints: [1, 3, 6], missing the last element.

Working example but needs Atomic Variable

Note this seems to get the correct check I want, but is there a better solution? Looks awful:

public static void main(String[] args) {

    Stream<Integer> inputStream = Stream.of(1,2,3,4);
    Iterator<Integer> iterator = inputStream.iterator();

    AtomicBoolean check = new AtomicBoolean(true);

    Stream<Integer> outputStream = Stream.iterate(
        iterator.next(),
        i -> check.get(),
        next -> {
            check.set(iterator.hasNext());
            return iterator.hasNext() ? iterator.next() + next : next;
        }
    );

    List<Integer> outputList = outputStream.collect(Collectors.toList());
    System.out.println(outputList);

}

Generic Problem description

Here's a generic code illustrating the problem.

public static <O, I> Stream<O> iterate(O seed, Stream<I> stream, BiFunction<I,O,O> function) {
    return iterate(seed, stream.iterator(), function);
}

public static <O, I> Stream<O> iterate(O seed, Iterator<I> iterator, BiFunction<I,O,O> function) {
    AtomicBoolean hasNext = new AtomicBoolean(true);
    return Stream.iterate(
        seed,
        i -> hasNext.get(),
        next -> {
            hasNext.set(iterator.hasNext());
            return iterator.hasNext() ? function.apply(iterator.next(), next) : next;
        }
    );
}

public static void main(String[] args) {
    
    Stream<Integer> inputStream = Stream.of(2,3,4);
    BiFunction<Integer, Integer, Integer> f = Integer::sum;
    Stream<Integer> outputStream = iterate(1, inputStream, f);
    
    List<Integer> outputList = outputStream.collect(Collectors.toList());
    System.out.println(outputList);
    
}

Problem Context

Basically, I want to do this because I am creating a function which produces a forecast of the balance of an interest bearing account.

I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.

Also note that the next element of the Stream depends on the previous. This is why I have a seed which represents the first value (does not have a previous value), which would be the opening balance.

Upvotes: 2

Views: 441

Answers (4)

Holger
Holger

Reputation: 298163

The problem is that the three arguments to Stream.iterate(…,…,…) correspond to the three arguments to a for(…;…;…) … statement, where the loop’s body corresponds to the chained stream operations. But this doesn’t match an iterator loop, which looks like

for(Iterator<I> iterator = …; iterator.hasNext(); ) {
    I elements = iterator.next();
    …
}

In other words, the right place of the next() call would be after the hasNext() check, before the remaining loop body, whereas the function passed as third argument to Stream.iterate(…,…,…) is evaluated after the loop body equivalent, before the hasNext() check.

A simple solution would be

public static <O, I> Stream<O> iterate(
    O seed, Iterator<I> iterator, BiFunction<I,O,O> function) {

    return Stream.iterate(seed, Objects::nonNull,
        prev -> iterator.hasNext()? function.apply(iterator.next(), prev): null);
}

which effectively moves the hasNext() check before the next() call. But it requires that null will never occur as a normal result of the function evaluation, which should be the case for all arithmetic operations.

Otherwise, you would have to go one level deeper to implement a general-purpose stream operation:

public static <O, I> Stream<O> iterate(
    O seed, Stream<I> stream, BiFunction<I,O,O> function) {

    boolean parallel = stream.isParallel();
    Spliterator<I> sp = stream.spliterator();

    return StreamSupport.stream(new Spliterators.AbstractSpliterator<O>(
        sp.estimateSize() == Long.MAX_VALUE? Long.MAX_VALUE: sp.estimateSize() + 1,
        sp.characteristics() & Spliterator.SIZED | Spliterator.ORDERED) {

        O value = seed;
        final Consumer<I> c = i -> value = function.apply(i, value);
        boolean end;

        @Override
        public boolean tryAdvance(Consumer<? super O> action) {
            if(end) return false;
            O current = value;
            end = !sp.tryAdvance(c);
            action.accept(current);
            return true;
        }
    }, parallel).onClose(stream::close);
}

This is more complicated, but has no constraints of the values produced by the function and is more efficient than going through an Iterator. Note that this solution also cares to retain the current parallel setting and ensures that closing the stream will be delegated to the original stream, which might be important when being backed by a resource, like Files.lines, etc.

Upvotes: 4

Alexander Ivanchenko
Alexander Ivanchenko

Reputation: 28988

Here's a generic code illustrating the problem.

public static <O, I> Stream<O> iterate(O seed, Stream<I> stream,
                                       BiFunction<I,O,O> function) {
    
    return iterate(seed, stream.iterator(), function);
}

I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.

I've written the following solution based on the assumption that it's possible to merge the balances produces in the different threads, and the function BiFunction<I,O,O> is associative, as well as a merging function BinaryOperator<O>, that would be responsible for combining resulting values, i.e. balances.

Also, the value of seed should meat the same requirements which are imposed on identity of the Stream.reduce() operation (otherwise parallel stream would yield an incorrect result). I.e.

merger.apply(r, mapper.apply(seed, t)) == mapper.apply(t, r)

Which would not hold true if seed would be 1 and both mapper and merger would be defined as Integer::sum, but i -> i * i and seed 1 the condition would be met.

Note that if at least one of these requirements is not possible to fulfil, streams are not the right tool for this problem.

Here's how it might be implemented using a custom Collector. To define a Collector we can use static method Collector.of().

public static <R, T> Stream<R> iterate(R seed, Stream<T> stream,
                                       BiFunction<T, R, R> mapper,
                                       BinaryOperator<R> merger) {
    return stream
        .collect(sequenceCollector(seed, mapper, merger));
}

public static <R, T> Collector<T, ?, Stream<R>> sequenceCollector(R seed,
                                                                  BiFunction<T, R, R> mapper,
                                                                  BinaryOperator<R> merger) {
    return Collector.of(
        ArrayDeque::new,
        (Deque<R> deque, T next) ->
            deque.add(
                mapper.apply(next, Objects.requireNonNullElse(deque.peekLast(), seed))
            ),
        (left, right) -> {
            R last = left.getLast();
            right.forEach(next -> left.add(merger.apply(next, last)));
            return left;
        },
        Collection::stream
    );
}

main()

public static void main(String[] args) {
    Stream<Integer> input1 = Stream.of(1, 2, 3, 4);
    Stream<Integer> output1 = iterate(0, input1, Integer::sum, Integer::sum);
    
    List<Integer> outputList1 = output1.toList();
    System.out.println("Sequential: " + outputList1);
    
    Stream<Integer> input2 = Stream.of(1, 2, 3, 4).parallel();
    Stream<Integer> output2 = iterate(0, input2, Integer::sum, Integer::sum);
    
    List<Integer> outputList2 = output2.toList();
    System.out.println("Parallel:   " + outputList2);
}

Output:

Sequential: [1, 3, 6, 10]
Parallel:   [1, 3, 6, 10]

Upvotes: 1

user_3380739
user_3380739

Reputation: 1254

Try this:

AtomicInteger sum = new AtomicInteger(0);
List<Integer> result = Stream.of(1, 2, 3, 4).map(it -> sum.addAndGet(it)).toList();
result.forEach(System.out::print);

Upvotes: 1

Arvind Kumar Avinash
Arvind Kumar Avinash

Reputation: 79085

... but is there a better solution?

Yes, an elegant solution can be by using Arrays#parallelPrefix.

public class Main {
    public static void main(String args[]) {
        int[] arr = { 1, 2, 3, 4 };
        Arrays.parallelPrefix(arr, Integer::sum);
        System.out.println(Arrays.toString(arr));
    }
}

Output:

[1, 3, 6, 10]

You can always convert back and forth between Stream<Integer> and int[] as per your requirement.

public class Main {
    public static void main(String args[]) {
        int[] arr = Stream.of(1, 2, 3, 4).mapToInt(Integer::valueOf).toArray();
        Arrays.parallelPrefix(arr, Integer::sum);
        System.out.println(Arrays.toString(arr));

        // In case , you need a Stream<Integer> again
        Stream<Integer> resultStream = Arrays.stream(arr).boxed();

        // Or want the result as a List<Integer>
        List<Integer> resultList = resultStream.toList();
        System.out.println(resultList);
    }
}

Upvotes: 7

Related Questions