Reputation: 1369
I have a list of Integers {2,4,6,8,9,10,12}
To simplify my problem, My goal is to get all even integers until I encounter an odd number. So my result should be -> {2,4,6,8, 9}
Also, i have an actor that says if a number is even or odd (for simplicity)
I have done the following :-
CompletionStage<List<Integer>> result = Source.from(integerList)
.ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
.map(oddEvenResult -> if(oddEvenResult.isOdd()){
//stop processing further elements
}
else {
return oddEvenResult.number();
})
.runWith(Sink.seq(), materializer)
So how can i stop the proceessing of further elements as soon as i encounter an odd element?
The CompletionStage "result" should contain 2,4,6,8,9 once the stream is complete.
I checked out the statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html) However, this will still process the other elements after 9 as the actor will still be "asked"
Of course I can do the following :-
Have a resultList variable (global) and i do a resultList.add(oddEvenResult.number()) and then Throw an exception once i encounter odd number. I have to write a custom exception class to piggy back this global resultList.
Use takeWhile as suggested by @Jeffrey Chung, but the OddEvenActoor is still "asked" to process elements 10 and 12. This is pointless.
Is there a cleaner way to achieve this?
Upvotes: 0
Views: 566
Reputation: 8539
If you'd like to keep your actor you can implement it, for example as the following(in Scala):
implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()
class StopOnOdd extends Actor with ActorLogging {
override def receive: Receive = {
case x: Int if x % 2 == 0 =>
log.info(s"Just received an even int: $x")
sender() ! x
case x: Int if x % 2 == 1 =>
log.info(s"Just received an odd number: $x Stop processing.")
context.become(dontProcess)
case _ =>
}
private def dontProcess: Receive = {
case x =>
log.info(s"Dropping $x because odd number was received.")
}
}
def main(args: Array[String]): Unit = {
val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")
val source = Source(List(2,4,6,8,9,10,12))
implicit val timeout: Timeout = Timeout(2.seconds)
val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)
source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}
The output is:
[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 2
Got number: 2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 4
Got number: 4
Got number: 6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 6
Got number: 8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an odd number: 9 Stop processing.
Upvotes: 0
Reputation: 19527
Use takeWhile
. In Scala, this would be something like the following:
implicit val timeout: akka.util.Timeout = 3.seconds
val result: Future[Seq[Int]] =
Source(List(2, 4, 6, 8, 9, 10, 12))
.ask[OddEvenResponse](oddEvenActor)
.takeWhile(resp => !resp.isOdd, true)
.map(_.number)
.runWith(Sink.seq)
Note the use of the inclusive
boolean flag in the invocation of takeWhile
, which is necessary if you want to keep the first odd number.
The Java equivalent would look similar.
Upvotes: 3