The Coordinator
The Coordinator

Reputation: 13137

Java Streams — How to perform an intermediate function every nth item

I am looking for an operation on a Stream that enables me to perform a non-terminal (and/or terminal) operation every nth item. Although I use a stream of primes for example, the stream could just as easily be web-requests, user actions, or some other cold data or live feed being produced.

From this:

    Duration start = Duration.ofNanos(System.nanoTime());

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .forEach(System.out::println);

    System.out.println("Duration: " + Duration.ofNanos(System.nanoTime()).minus(start));

To a stream function like this:

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .peekEvery(10, System.out::println)
            .forEach( it -> {});

Upvotes: 5

Views: 3071

Answers (3)

Andreas
Andreas

Reputation: 159086

Create a helper method to wrap the peek() consumer:

public static IntConsumer every(int count, IntConsumer consumer) {
    if (count <= 0)
        throw new IllegalArgumentException("Count must be >1: Got " + count);
    return new IntConsumer() {
        private int i;
        @Override
        public void accept(int value) {
            if (++this.i == count) {
                consumer.accept(value);
                this.i = 0;
            }
        }
    };
}

You can now use it almost exactly like you wanted:

IntStream.rangeClosed(1, 20)
         .peek(every(5, System.out::println))
         .count();

Output

5
10
15
20

The helper method can be put in a utility class and statically imported, similar to how the Collectors class is nothing but static helper methods.

As noted by @user140547 in a comment, this code is not thread-safe, so it cannot be used with parallel streams. Besides, the output order would be messed up, so it doesn't really make sense to use it with parallel streams anyway.

Upvotes: 8

user140547
user140547

Reputation: 8200

It is not a good idea to rely on peek() and count() as it is possible that the operation is not invoked at all if count() can be calculated without going over the whole stream. Even if it works now, it does not mean that it is also going to work in future. See the javadoc of Stream.count() in Java 9.

Better use forEach().

For the problem itself: In special cases like a simple iteration, you could just filter your objects like.

Stream.iterate(2, n->n+1)
        .limit(20)
        .filter(n->(n-2)%5==0 && n!=2)
        .forEach(System.out::println);

This of course won't work for other cases, where you might use a stateful IntConsumer. If iterate() is used, it is probably not that useful to use parallel streams anyway.

If you want a generic solution, you could also try to use a "normal" Stream, which may not be as efficient as an IntStream, but should still suffice in many cases:

class Tuple{ // ctor, getter/setter omitted
    int index;
    int value;
}

Then you could do:

Stream.iterate( new Tuple(1,2),t-> new Tuple(t.index+1,t.value*2))
        .limit(30)
        .filter(t->t.index %5 == 0)
        .forEach(System.out::println);

If you have to use peek(), you can also do

.peek(t->{if (t.index %5 == 0) System.out.println(t);})

Or if you add methods

static Tuple initialTuple(int value){
   return new Tuple(1,value);
}

static UnaryOperator<Tuple> createNextTuple(IntUnaryOperator f){
    return current -> new Tuple(current.index+1,f.applyAsInt(current.value));
}
static Consumer<Tuple> every(int n,IntConsumer consumer){
    return tuple -> {if (tuple.index % n == 0) consumer.accept(tuple.value);};
}

you can also do (with static imports):

  Stream.iterate( initialTuple(2), createNextTuple(x->x*2))
        .limit(30)
        .peek(every(5,System.out::println))
        .forEach(System.out::println);

Upvotes: 6

user4910279
user4910279

Reputation:

Try this.

int[] counter = {0};
long result = IntStream.iterate(2, n -> n + 1)
    .filter(Findprimes::isPrime)
    .limit(100)
    .peek(x -> { if (counter[0]++ % 10 == 0) System.out.print(x + " ");} )
    .count();

result:

2 31 73 127 179 233 283 353 419 467 

Upvotes: 1

Related Questions