Jerald Baker
Jerald Baker

Reputation: 1369

How to stop processing further elements in akka streams?

I have a list of Integers {2,4,6,8,9,10,12}

To simplify my problem, My goal is to get all even integers until I encounter an odd number. So my result should be -> {2,4,6,8, 9}

Also, i have an actor that says if a number is even or odd (for simplicity)

I have done the following :-

CompletionStage<List<Integer>> result = Source.from(integerList)
      .ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
      .map(oddEvenResult -> if(oddEvenResult.isOdd()){
                                //stop processing further elements
                            }
                            else {
                                return oddEvenResult.number();
                            })
     .runWith(Sink.seq(), materializer)

So how can i stop the proceessing of further elements as soon as i encounter an odd element?

The CompletionStage "result" should contain 2,4,6,8,9 once the stream is complete.

I checked out the statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html) However, this will still process the other elements after 9 as the actor will still be "asked"

Of course I can do the following :-

  1. Have a resultList variable (global) and i do a resultList.add(oddEvenResult.number()) and then Throw an exception once i encounter odd number. I have to write a custom exception class to piggy back this global resultList.

  2. Use takeWhile as suggested by @Jeffrey Chung, but the OddEvenActoor is still "asked" to process elements 10 and 12. This is pointless.

Is there a cleaner way to achieve this?

Upvotes: 0

Views: 566

Answers (2)

Tomer Shetah
Tomer Shetah

Reputation: 8539

If you'd like to keep your actor you can implement it, for example as the following(in Scala):

implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()

class StopOnOdd extends Actor with ActorLogging {
  override def receive: Receive = {
    case x: Int if x % 2 == 0 =>
      log.info(s"Just received an even int: $x")
      sender() ! x
    case x: Int if x % 2 == 1 =>
      log.info(s"Just received an odd number: $x Stop processing.")
      context.become(dontProcess)
    case _ =>
  }

  private def dontProcess: Receive = {
    case x =>
      log.info(s"Dropping $x because odd number was received.")
    }
  }


  def main(args: Array[String]): Unit = {
  val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")

  val source = Source(List(2,4,6,8,9,10,12))
  implicit val timeout: Timeout = Timeout(2.seconds)
  val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)

  source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}

The output is:

[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 2
Got number: 2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 4
Got number: 4
Got number: 6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 6
Got number: 8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an odd number: 9 Stop processing.

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Use takeWhile. In Scala, this would be something like the following:

implicit val timeout: akka.util.Timeout = 3.seconds

val result: Future[Seq[Int]] =
  Source(List(2, 4, 6, 8, 9, 10, 12))
    .ask[OddEvenResponse](oddEvenActor)
    .takeWhile(resp => !resp.isOdd, true)
    .map(_.number)
    .runWith(Sink.seq)

Note the use of the inclusive boolean flag in the invocation of takeWhile, which is necessary if you want to keep the first odd number.

The Java equivalent would look similar.

Upvotes: 3

Related Questions