Reputation: 31526
I have written a scala stream application. My objective is that if during the stream processing an item encounters an error during processing, then the stream ignores it and continues processing the remaining items.
Towards this goal, I wrote this code
object StreamRestart extends App {
implicit val actorSystem = ActorSystem()
implicit val ec : ExecutionContext = actorSystem.dispatcher
var failed: ListBuffer[Int] = ListBuffer()
val decider : Supervision.Decider = {
case x : Exception => {
println(s"failed with error $x")
failed += x.getMessage.toInt
Supervision.Restart
}
}
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
def source : Source[Int, NotUsed] = {
val iter = (1 to 1000 toStream) map {x => if (x == 600) throw new Exception(x.toString) else x}
Source(iter)
}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int]{i => println(i)}
val future : Future[Done] = source.runWith(sink)
val f = future.map{_ =>
println("completed")
actorSystem.terminate()
}
Await.result(f, Duration.Inf)
println(s"who failed ${failed.toList}")
}
The above code crashes at 600 even though my decider says "Restart". If I move this exception side a "flow" then the decider works and the stream processes till it reaches 1000. but if the error occurs inside the source function, the application crashes.
Is there a way to make my application foolproof in a way that it always reaches the end. Otherwise how do we recover from errors when it occurs in the source function.
Upvotes: 0
Views: 448
Reputation: 13130
I would strongly recommend reading the documentation about handling errors in akka streams. To recover from failures you can use the operators called ‘recover’ or things like the ‘Restart*’ sources/sinks/flows.
The restart stages are most general and also most powerful really, so give them a look. Supervision also exists, which automatically handles such things but it has to be supported by a given operator. (Most built in operators do, but not all, check their docs)
Upvotes: 1
Reputation: 20080
An error in the source is meant to be an unrecoverable failure on that source instance, which means the stream cannot continue using that source but you need to switch to a different source: it makes sense that if you are listening from a websocket and the http server goes down, you will not be able to listen again on that websocket.
Akka streams offers recoverWithRetries
as described in the documentation to move to another source, or more generically to replace a part of the stream with a stream element with the same shape.
Upvotes: 3