Philipp Lengauer
Philipp Lengauer

Reputation: 1959

How to react to an exception (upstream failure) in an akka flow

I have an akka Flow[I, O] that is out of my control because its coming in from some third-party code. I need to react to whenever an input element does not produce an output element (for example, because an exception was thrown in some part of the flow). For that, I need the input element that produced the failure. I do not find any API on the flow or similar that allows me to register a handler or react to it in any way. How can I do that?

Upvotes: 2

Views: 440

Answers (1)

yiksanchan
yiksanchan

Reputation: 1940

You want to Resume rather than Stop when the akka streams flow throws an exception. After collecting all successful elements, you can Seq#diff to tell what elements are dropped due to exception thrown.

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Exception {

  case class MyException(n: Int) extends RuntimeException

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("Exception")
    implicit val ec: ExecutionContext = system.dispatcher

    val decider: Supervision.Decider = {
      case _: MyException => Supervision.Resume
      case _                      => Supervision.Stop
    }
    val flow = Flow[Int]
      .map(n =>
        if (n % 2 == 1) throw MyException(n)
        else n
      )
    val in = 1 to 10
    val outFuture = Source(in)
      .via(flow)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
    outFuture.onComplete {
      case Success(out) =>
        println("dropped elements are " + (in.diff(out)))
      case Failure(_) =>
        println("unknown failure")
    }
  }
}

The console outputs are:

dropped elements are Vector(1, 3, 5, 7, 9)

Reference: How to get object that caused failure in Akka Streams?

Upvotes: 2

Related Questions