Reputation: 1322
I'm trying to develop an application using Futures and Akka supervisors but when A future returns a Failure to an actor its supervisor is not getting the exception.
Here's my code.
1) Supervisor actor
class TransaccionActorSupervisor() extends Actor with ActorLogging {
val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")
def receive = {
case msg: Any => actor forward msg
}
override val supervisorStrategy = OneForOneStrategy() {
case exception =>
println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
Restart
}
}
Supervised actor
Class TransaccionActor() extends Actor with ActorLogging {
implicit val _: ExecutionContext = context.dispatcher
val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter
def receive = {
case msg: GetTransaccionById =>
val currentSender: ActorRef = sender()
transaccionAdapter.searchTransaction(msg.id).onComplete {
case Success(transaction) => currentSender ! transaction
case Failure(error) => throw error
}
}
What am I doing wrong?
Thank you all very much!
Upvotes: 4
Views: 978
Reputation: 1094
I had the same problem and Ryan's answer did help. But because I'm new to Akka it was not trivial to understand the answer so I would like to provide some details.
First, I think that onComplete
will not work at all. It just registers the callback function which may be called in a completely separate thread and does not return a new Future
. So any exception thrown within onComplete
will be lost.
Instead, it is better to use map
, recover
, recoverWith
or transform
as they return new Future
s. Then you need to the pipe the result to an actor, e.g. self
; the receiving actor has to process the piped result and rethrow the exception.
In other words your supervised actor should look like this:
import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor
class TransaccionActor() extends Actor with ActorLogging {
import context.dispatcher
val transaccionAdapter =
(new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter
def receive = {
case msg: GetTransaccionById =>
val currentSender: ActorRef = sender()
transaccionAdapter searchTransaction msg.id map { transaction =>
currentSender ! transaction
} pipeTo self
case Failure(throwable) => throw throwable
}
}
Upvotes: 4
Reputation: 7257
Exceptions thrown in a future within an actor are not caught by the actor. You'll need to pipe the exception to self
and then re-throw if you want it handled by the supervising actor.
Upvotes: 0