Reputation: 1959
I have an akka Flow[I, O]
that is out of my control because its coming in from some third-party code. I need to react to whenever an input element does not produce an output element (for example, because an exception was thrown in some part of the flow). For that, I need the input element that produced the failure. I do not find any API on the flow or similar that allows me to register a handler or react to it in any way. How can I do that?
Upvotes: 2
Views: 440
Reputation: 1940
You want to Resume
rather than Stop
when the akka streams flow throws an exception. After collecting all successful elements, you can Seq#diff
to tell what elements are dropped due to exception thrown.
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
object Exception {
case class MyException(n: Int) extends RuntimeException
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Exception")
implicit val ec: ExecutionContext = system.dispatcher
val decider: Supervision.Decider = {
case _: MyException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.map(n =>
if (n % 2 == 1) throw MyException(n)
else n
)
val in = 1 to 10
val outFuture = Source(in)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
outFuture.onComplete {
case Success(out) =>
println("dropped elements are " + (in.diff(out)))
case Failure(_) =>
println("unknown failure")
}
}
}
The console outputs are:
dropped elements are Vector(1, 3, 5, 7, 9)
Reference: How to get object that caused failure in Akka Streams?
Upvotes: 2