Ilias Stavrakis
Ilias Stavrakis

Reputation: 753

Why is this java Stream operated upon twice?

The Java 8 API says:

Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.

So why the following code throws :

java.lang.IllegalStateException: stream has already been operated upon or closed

Stream<Integer> stream = Stream.of(1,2,3);
stream.filter( x-> x>1 );
stream.filter( x-> x>2 ).forEach(System.out::print);

The first filtering operation according to the API is not supposed to operate on the Stream.

Upvotes: 25

Views: 3694

Answers (4)

Tunaki
Tunaki

Reputation: 137084

This happens because you are ignoring the return value of filter.

Stream<Integer> stream = Stream.of(1,2,3);
stream.filter( x-> x>1 ); // <-- ignoring the return value here
stream.filter( x-> x>2 ).forEach(System.out::print);

Stream.filter returns a new Stream consisting of the elements of this stream that match the given predicate. But it's important to note that it's a new Stream. The old one has been operated upon when the filter was added to it. But the new one wasn't.

Quoting from Stream Javadoc:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once.

In this case, filter is the intermediate operation that operated on the old Stream instance.

So this code will work fine:

Stream<Integer> stream = Stream.of(1,2,3);
stream = stream.filter( x-> x>1 ); // <-- NOT ignoring the return value here
stream.filter( x-> x>2 ).forEach(System.out::print);

As noted by Brian Goetz, you would commonly chain those calls together:

Stream.of(1,2,3).filter( x-> x>1 )
                .filter( x-> x>2 )
                .forEach(System.out::print);

Upvotes: 30

Alexis C.
Alexis C.

Reputation: 93842

The documentation on Streams says:

"A stream should be operated on (invoking an intermediate or terminal stream operation) only once."

You can actually see this in the source code. When you call filter it returns a new stateless operation, passing the current pipeline instance in the constructor (this):

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                       StreamOpFlag.NOT_SIZED) {
        ....
}

The constructor call ends up calling the AbstractPipeline constructor, which is set-up like this:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    ...
}

The first time you call filter on the source (line 2), it sets the boolean value to true. As you don't reuse the return value given by filter, the second call to filter (line 3) will detect that the original stream source (line 1) has already been linked (due to the first filter call) and hence the exception you get.

Upvotes: 1

Prim
Prim

Reputation: 2968

filter() method uses the stream and returns one other Stream instance that you ignore in you example.

filter is a intermediate operation but you can't call filter twice on same stream instance

Your code should be write as follow:

Stream<Integer> stream = Stream.of(1,2,3);
                               .filter( x-> x>1 )
                               .filter( x-> x>2);
stream.forEach(System.out::print);

As filter is an intermediate operation, "nothing" is done when calling theses methods. All the job is really processed when calling forEach() method

Upvotes: 4

Peter Lawrey
Peter Lawrey

Reputation: 533500

This is a misuse of the stream which is detect when you attach more than one .fliter() to it.

It doesn't say it has been traversed more than once, only that it has "already been operated upon"

Upvotes: 0

Related Questions