david
david

Reputation: 313

Akka Streams (Scala): Filtering out exceptions

One of the steps in my akka streams pipeline is a transformation that throws an exception when it receives an invalid input. I would like to discard these problematic inputs. So, I came up with the following solution:

...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...

Which takes 3 steps for what is, in fact, just a flatMap.

Is there a more straightforward akka way of doing this?

Upvotes: 2

Views: 210

Answers (2)

Leo C
Leo C

Reputation: 22449

If you want to silently discard the exceptions as indicated in your sample code, here's a couple of ways to reduce the steps:

// A dummy transformation
def transformation(i: Int): Int = 100 / i

// #1: Use `collect`
Source(List(5, 2, 0, 1)).
  map(input => Try(transformation(input)).toOption).
  collect{ case x if x.nonEmpty => x.get }.
  runForeach(println)
  // Result: 20, 50, 100

// #2: Use `mapConcat`
Source(List(5, 2, 0, 1)).
  mapConcat(input => List(Try(transformation(input)).toOption).flatten).
  runForeach(println)
  // Result: 20, 50, 100

Note that there is no flatMap for Akka Source/Flow, although mapConcat (and flatMapConcat) does function in a somewhat similar fashion.

Upvotes: 1

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

You can use Supervision Strategies. Taken from the doc:

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}

val flow = Flow[Int]
  .filter(100 / _ < 50)
  .map(elem => 100 / (5 - elem))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))

You can configure the Decider for doing whatever you need. If you need to skip that element for all Exceptions use

 case _: Throwable => Supervision.Resume

Take a look to https://doc.akka.io/docs/akka/current/stream/stream-error.html

Upvotes: 3

Related Questions