user2837491
user2837491

Reputation: 61

How to end an infinite akka stream

I'm new to Akka Streams but have a case where I want to use it to look for permutations from an infinite source. A simplified example with a finite source could look like this.

val future = Source(1 to 100)
    .map { i => if (i % 20 == 0) println(i); i }
    .filter(_ == 42)
    .runWith(Sink.fold[Int, Int](0)(Keep.right))

This exemple outputs:

20
40
60
80
100

I'm obviously fine with the source getting past 42 but I don't want to exhaust the entire stream before being able to get the result.

val result: Int = Await.result(future, 1.second)
result should be(42)

Question is, how should I end the stream when I've found what I'm looking for?

Upvotes: 3

Views: 939

Answers (2)

Generalizing to N values, e.g. the 10 values greater than or equal to 42, you can use grouped:

val N = 10

val future : Future[Seq[Int]] = 
  Source(1 to 100).map { i => if (i % 20 == 0) println(i); i }
                  .filter(_ >= 42)
                  .grouped(N)
                  .runWith(Sink.head)

Upvotes: 2

Viktor Klang
Viktor Klang

Reputation: 26579

val future = Source(1 to 100)
    .map { i => if (i % 20 == 0) println(i); i }
    .filter(_ == 42)
    .runWith(Sink.head)

Upvotes: 4

Related Questions