Reputation: 8127
I have the following code which generates the sum of a series i.e. 1, 1+2, 1+2+3, 1+2+3+4
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(1,2,3,4);
Iterator<Integer> iterator = inputStream.iterator();
Stream<Integer> outputStream = Stream.iterate(
iterator.next(),
i -> iterator.hasNext(),
next -> {
return iterator.next() + next;
}
);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
But this prints: [1, 3, 6]
, missing the last element.
Note this seems to get the correct check I want, but is there a better solution? Looks awful:
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(1,2,3,4);
Iterator<Integer> iterator = inputStream.iterator();
AtomicBoolean check = new AtomicBoolean(true);
Stream<Integer> outputStream = Stream.iterate(
iterator.next(),
i -> check.get(),
next -> {
check.set(iterator.hasNext());
return iterator.hasNext() ? iterator.next() + next : next;
}
);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
Here's a generic code illustrating the problem.
public static <O, I> Stream<O> iterate(O seed, Stream<I> stream, BiFunction<I,O,O> function) {
return iterate(seed, stream.iterator(), function);
}
public static <O, I> Stream<O> iterate(O seed, Iterator<I> iterator, BiFunction<I,O,O> function) {
AtomicBoolean hasNext = new AtomicBoolean(true);
return Stream.iterate(
seed,
i -> hasNext.get(),
next -> {
hasNext.set(iterator.hasNext());
return iterator.hasNext() ? function.apply(iterator.next(), next) : next;
}
);
}
public static void main(String[] args) {
Stream<Integer> inputStream = Stream.of(2,3,4);
BiFunction<Integer, Integer, Integer> f = Integer::sum;
Stream<Integer> outputStream = iterate(1, inputStream, f);
List<Integer> outputList = outputStream.collect(Collectors.toList());
System.out.println(outputList);
}
Basically, I want to do this because I am creating a function which produces a forecast of the balance of an interest bearing account.
I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.
Also note that the next element of the Stream
depends on the previous. This is why I have a seed
which represents the first value (does not have a previous value), which would be the opening balance.
Upvotes: 2
Views: 441
Reputation: 298163
The problem is that the three arguments to Stream.iterate(…,…,…)
correspond to the three arguments to a for(…;…;…) …
statement, where the loop’s body corresponds to the chained stream operations. But this doesn’t match an iterator loop, which looks like
for(Iterator<I> iterator = …; iterator.hasNext(); ) {
I elements = iterator.next();
…
}
In other words, the right place of the next()
call would be after the hasNext()
check, before the remaining loop body, whereas the function passed as third argument to Stream.iterate(…,…,…)
is evaluated after the loop body equivalent, before the hasNext()
check.
A simple solution would be
public static <O, I> Stream<O> iterate(
O seed, Iterator<I> iterator, BiFunction<I,O,O> function) {
return Stream.iterate(seed, Objects::nonNull,
prev -> iterator.hasNext()? function.apply(iterator.next(), prev): null);
}
which effectively moves the hasNext()
check before the next()
call. But it requires that null
will never occur as a normal result of the function evaluation, which should be the case for all arithmetic operations.
Otherwise, you would have to go one level deeper to implement a general-purpose stream operation:
public static <O, I> Stream<O> iterate(
O seed, Stream<I> stream, BiFunction<I,O,O> function) {
boolean parallel = stream.isParallel();
Spliterator<I> sp = stream.spliterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<O>(
sp.estimateSize() == Long.MAX_VALUE? Long.MAX_VALUE: sp.estimateSize() + 1,
sp.characteristics() & Spliterator.SIZED | Spliterator.ORDERED) {
O value = seed;
final Consumer<I> c = i -> value = function.apply(i, value);
boolean end;
@Override
public boolean tryAdvance(Consumer<? super O> action) {
if(end) return false;
O current = value;
end = !sp.tryAdvance(c);
action.accept(current);
return true;
}
}, parallel).onClose(stream::close);
}
This is more complicated, but has no constraints of the values produced by the function and is more efficient than going through an Iterator
. Note that this solution also cares to retain the current parallel setting and ensures that closing the stream will be delegated to the original stream, which might be important when being backed by a resource, like Files.lines
, etc.
Upvotes: 4
Reputation: 28988
Here's a generic code illustrating the problem.
public static <O, I> Stream<O> iterate(O seed, Stream<I> stream, BiFunction<I,O,O> function) { return iterate(seed, stream.iterator(), function); }
I want to be able to take a stream of dates and then produce a stream of balances. That way you don't need to know how many elements there will be, or even the distribution of dates, which makes it a more flexible approach.
I've written the following solution based on the assumption that it's possible to merge the balances produces in the different threads, and the function BiFunction<I,O,O>
is associative, as well as a merging function BinaryOperator<O>
, that would be responsible for combining resulting values, i.e. balances.
Also, the value of seed
should meat the same requirements which are imposed on identity
of the Stream.reduce()
operation (otherwise parallel stream would yield an incorrect result). I.e.
merger.apply(r, mapper.apply(seed, t)) == mapper.apply(t, r)
Which would not hold true if seed
would be 1
and both mapper
and merger
would be defined as Integer::sum
, but i -> i * i
and seed
1
the condition would be met.
Note that if at least one of these requirements is not possible to fulfil, streams are not the right tool for this problem.
Here's how it might be implemented using a custom Collector. To define a Collector we can use static method Collector.of()
.
public static <R, T> Stream<R> iterate(R seed, Stream<T> stream,
BiFunction<T, R, R> mapper,
BinaryOperator<R> merger) {
return stream
.collect(sequenceCollector(seed, mapper, merger));
}
public static <R, T> Collector<T, ?, Stream<R>> sequenceCollector(R seed,
BiFunction<T, R, R> mapper,
BinaryOperator<R> merger) {
return Collector.of(
ArrayDeque::new,
(Deque<R> deque, T next) ->
deque.add(
mapper.apply(next, Objects.requireNonNullElse(deque.peekLast(), seed))
),
(left, right) -> {
R last = left.getLast();
right.forEach(next -> left.add(merger.apply(next, last)));
return left;
},
Collection::stream
);
}
main()
public static void main(String[] args) {
Stream<Integer> input1 = Stream.of(1, 2, 3, 4);
Stream<Integer> output1 = iterate(0, input1, Integer::sum, Integer::sum);
List<Integer> outputList1 = output1.toList();
System.out.println("Sequential: " + outputList1);
Stream<Integer> input2 = Stream.of(1, 2, 3, 4).parallel();
Stream<Integer> output2 = iterate(0, input2, Integer::sum, Integer::sum);
List<Integer> outputList2 = output2.toList();
System.out.println("Parallel: " + outputList2);
}
Output:
Sequential: [1, 3, 6, 10]
Parallel: [1, 3, 6, 10]
Upvotes: 1
Reputation: 1254
Try this:
AtomicInteger sum = new AtomicInteger(0);
List<Integer> result = Stream.of(1, 2, 3, 4).map(it -> sum.addAndGet(it)).toList();
result.forEach(System.out::print);
Upvotes: 1
Reputation: 79085
... but is there a better solution?
Yes, an elegant solution can be by using Arrays#parallelPrefix
.
public class Main {
public static void main(String args[]) {
int[] arr = { 1, 2, 3, 4 };
Arrays.parallelPrefix(arr, Integer::sum);
System.out.println(Arrays.toString(arr));
}
}
Output:
[1, 3, 6, 10]
You can always convert back and forth between Stream<Integer>
and int[]
as per your requirement.
public class Main {
public static void main(String args[]) {
int[] arr = Stream.of(1, 2, 3, 4).mapToInt(Integer::valueOf).toArray();
Arrays.parallelPrefix(arr, Integer::sum);
System.out.println(Arrays.toString(arr));
// In case , you need a Stream<Integer> again
Stream<Integer> resultStream = Arrays.stream(arr).boxed();
// Or want the result as a List<Integer>
List<Integer> resultList = resultStream.toList();
System.out.println(resultList);
}
}
Upvotes: 7