Reputation: 14550
i would like to create a method that does some complex operations on a stream (like e.g. replace 7th element, remove last element, remove adjacent duplicates etc) without caching the whole stream.
but what stream api lets me plug this method in? do i have to create my own collector that while collecting emits items to some other stream? but that would change the data-flow direction from pull to push, right?
what's the possible signature of such method?
Stream<T> process(Stream<T> in)
is probably impossible (in a single threaded code) because result could be returned only after collecting the whole input stream
another idea:
void process(Stream<T> in, Stream<T> out)
also seems a bit flawed because java doesn't allow to emit to insert items to existing stream (provided as out
parameter).
so how can i do some complex stream processing in java?
Upvotes: 1
Views: 3139
Reputation: 6689
Building on tobias_k answer and ideas expressed in this question/update 2 we may just return proper Predicate and Map functions which capture their local variables. (These functions as a result are stateful, which is not ideal for streams, but distinct()
method in streams API is probably stateful too).
Here is the modified code:
public class Foo {
public static void run() {
List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo");
lst.stream()
.filter(Foo.filterAdjacentDupes())
.map(Foo.replaceNthElement(3, "BAR"))
.forEach(System.out::println);
// Output: foo bar BAR foo
}
public static <T> Predicate<T> filterAdjacentDupes() {
final AtomicReference<T> last = new AtomicReference<>();
return t -> ! t.equals(last.getAndSet(t));
}
public static <T> UnaryOperator<T> replaceNthElement(int n, T repl) {
final AtomicInteger count = new AtomicInteger();
return t -> count.incrementAndGet() == n ? repl : t;
}
}
Upvotes: 0
Reputation: 27986
The complex operations you use as examples all follow the pattern of an operation on one element in the stream depending on other elements in the stream. Java streams are specifically designed to not allow these types of operations without a collection or reduction. Streams operations do not allow direct access to other members and, in general, non-terminal operations with side-effects are a bad idea.
Note the following from the Stream
javadoc:
Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source.
More specifically:
Most stream operations accept parameters that describe user-specified behavior ... To preserve correct behavior, these behavioral parameters:
must be non-interfering (they do not modify the stream source); and in most cases must be stateless (their result should not depend on any state that might change during execution of the stream pipeline).
and
Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline
All the complexities of itermediate and terminal stateless and stateful operations are well described at https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html and http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
This approach has both advantages and disadvantages. A significant advantage is that it allows parallel processing of streams. A significant disadvantage is that operations that are easy in some other languages (such as skipping every third element in the stream) are difficult in Java.
Note that you will see a lot of code (including accepted answers on SO) that ignore the advice that behavioural parameters of stream operations should be stateless. To work, this code relies on behaviour of an implementation of Java that is not defined by the language specification: namely, that streams are processed in order. There is nothing in the specification stopping an implementation of Java processing elements in reverse order or random order. Such an implementation would make any stateful stream operations immediately behave differently. Stateless operations would continue to behave exactly the same. So, to summarise, stateful operations rely on details of the implementation of Java rather than the specification.
Also note that it is possible to have safe stateful intermediate operations. They need to be designed so that they specifically do not rely on the order in which elements are processed. Stream.distinct
and Stream.sorted
are good examples of this. They need to maintain state to work, but they are designed to work irrespective of the order in which elements are processed.
So to answer your question, these types of operations are possible to do in Java but they are not simple, safe (for the reason given in the previous paragraph) or a natural fit for the language design. I suggest using reduction or collection or (see Tagir Valeev's answer) a spliterator to create a new stream. Alternatively use traditional iteration.
Upvotes: 5
Reputation: 100309
The correct (though not very easy) way to do this is to write your own Spliterator
. The common algorithm is the following:
stream.spliterator()
StreamSupport.stream(spliterator, stream.isParallel())
close()
call to original stream like .onClose(stream::close)
.Writing good spliterator which parallelizes well is often very non-trivial task. However if you don't care about parallelization you may subclass AbstractSpliterator
which is simpler. Here's an example how to write a new Stream operation which removes an element at given position:
public static <T> Stream<T> removeAt(Stream<T> src, int idx) {
Spliterator<T> spltr = src.spliterator();
Spliterator<T> res = new AbstractSpliterator<T>(Math.max(0, spltr.estimateSize()-1),
spltr.characteristics()) {
long cnt = 0;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(cnt++ == idx && !spltr.tryAdvance(x -> {}))
return false;
return spltr.tryAdvance(action);
}
};
return StreamSupport.stream(res, src.isParallel()).onClose(src::close);
}
This is minimal implementation and it can be improved to show better performance and parallelism.
In my StreamEx library I tried to simplify the addition of such custom stream operations via headTail
. Here's how to do the same using StreamEx
:
public static <T> StreamEx<T> removeAt(StreamEx<T> src, int idx) {
// head is the first stream element
// tail is the stream of the rest elements
// want to remove first element? ok, just remove tail
// otherwise call itself with decremented idx and prepend the head element to the result
return src.headTail(
(head, tail) -> idx == 0 ? tail : removeAt(tail, idx-1).prepend(head));
}
You can even support chaining with chain()
method:
public static <T> Function<StreamEx<T>, StreamEx<T>> removeAt(int idx) {
return s -> removeAt(s, idx);
}
Usage example:
StreamEx.of("Java 8", "Stream", "API", "is", "not", "great")
.chain(removeAt(4)).forEach(System.out::println);
Finally note that even without headTail
there are some ways to solve your problems using StreamEx. To remove at specific index you may zip with increasing numbers, then filter and drop indexes like this:
StreamEx.of(stream)
.zipWith(IntStreamEx.ints().boxed())
.removeValues(pos -> pos == idx)
.keys();
To collapse adjacent repeats there's dedicated collapse
method (it even parallelizes quite well!):
StreamEx.of(stream).collapse(Object::equals);
Upvotes: 1
Reputation: 82939
You could just call and return any of the standard stream operations, such as filter
, map
, reduce
, etc., and have them perform some complex operation, e.g. one that required external data. For example, filterAdjacentDuplicates
and replaceNthElement
could be implemented like this:
public static <T> Stream<T> filterAdjacentDupes(Stream<T> stream) {
AtomicReference<T> last = new AtomicReference<>();
return stream.filter(t -> ! t.equals(last.getAndSet(t)));
}
public static <T> Stream<T> replaceNthElement(Stream<T> stream, int n, T repl) {
AtomicInteger count = new AtomicInteger();
return stream.map(t -> count.incrementAndGet() == n ? repl : t);
}
Example usage:
List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo");
replaceNthElement(filterAdjacentDupes(lst.stream()), 3, "BAR").forEach(System.out::println);
// Output: foo bar BAR foo
However, as noted in comments this is not really how the Stream API is supposed to be used. In particular, operations such as these two will fail when given parallel streams.
Upvotes: 1