Reputation: 313
One of the steps in my akka streams pipeline is a transformation that throws an exception when it receives an invalid input. I would like to discard these problematic inputs. So, I came up with the following solution:
...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...
Which takes 3 steps for what is, in fact, just a flatMap.
Is there a more straightforward akka way of doing this?
Upvotes: 2
Views: 210
Reputation: 22449
If you want to silently discard the exceptions as indicated in your sample code, here's a couple of ways to reduce the steps:
// A dummy transformation
def transformation(i: Int): Int = 100 / i
// #1: Use `collect`
Source(List(5, 2, 0, 1)).
map(input => Try(transformation(input)).toOption).
collect{ case x if x.nonEmpty => x.get }.
runForeach(println)
// Result: 20, 50, 100
// #2: Use `mapConcat`
Source(List(5, 2, 0, 1)).
mapConcat(input => List(Try(transformation(input)).toOption).flatten).
runForeach(println)
// Result: 20, 50, 100
Note that there is no flatMap
for Akka Source/Flow, although mapConcat
(and flatMapConcat
) does function in a somewhat similar fashion.
Upvotes: 1
Reputation: 4133
You can use Supervision Strategies. Taken from the doc:
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50)
.map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
You can configure the Decider for doing whatever you need. If you need to skip that element for all Exceptions use
case _: Throwable => Supervision.Resume
Take a look to https://doc.akka.io/docs/akka/current/stream/stream-error.html
Upvotes: 3