Reputation: 3197
I have a supervising Akka actor which uses a router to forward messages to worker actors.
I have a class which wraps the supervisor and when I call a method on that class it "asks" the supervisor to do something and then I use Await.result(theFuture)
to wait for the result (I cannot continue without the result).
If the workers throw an exception, I want to restart the worker which threw the exception, and I want the exception to be caught by the code which calls the wrapper class.
I passed a OneForOneStrategy
to the router constructor, which returns RESTART
in the case of an Exception
. In the postRestart
method of the worker, I log the restart, so I can validate that the worker is actually restarted.
When the worker throws an exception, it gets restarted, but the exception disappears. The Future
which is the result of asking the supervisor, contains an exception, but it is a akka.pattern.AskTimeoutException
, which is thrown after just 5 seconds rather than 20 seconds, which is the implicit timeout that I have lingering around. The exception actually occurs less than a second after the worker starts.
Question 1: how can I get the exception from the worker in the code which calls my wrapper class?
Also, the receive method of the worker is like this:
def receive = {
case r: Request =>
val response = ??? //throws an exception sometimes
sender ! response
}
Something is logging the exception to the console, but it isn't my code. The stack trace is:
[ERROR] [02/11/2013 21:34:20.093] [MySystem-akka.actor.default-dispatcher-9]
[akka://MySystem/user/MySupervisor/MyRouter/$a] Something went wrong!
at myApp.Worker.$$anonfun$receive$1.applyOrElse(Source.scala:169)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:425)
at akka.actor.ActorCell.invoke(ActorCell.scala:386)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230)
at akka.dispatch.Mailbox.run(Mailbox.scala:212)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Line 169 of Source.scala
is the line val response = ???
shown in the listing of the receive
method above.
Question 2: who is logging that exception to the console, and how can I stop it?
Upvotes: 2
Views: 4565
Reputation: 3197
1)
try somethingThatCanFail() catch {
case e: Exception => sender ! Status.Failure(e); throw e
}
The "tell failure" causes the caller to get a Failure containing the exception. Throwing "e" causes the oneForOneStrategy to be called which restarts the worker.
2)
It is the actor system itself that logs the failure, and the only way to quiet it down is to filter out things by creating and configuring your own LoggingAdapter as described here http://doc.akka.io/docs/akka/2.1.0/scala/logging.html There is a ticket for changing this https://www.assembla.com/spaces/akka/tickets/2824 but it is targeted for Akka 2.2
Answered by https://groups.google.com/forum/#!topic/akka-user/fenCvYu3HYE
Upvotes: 3
Reputation: 3675
In order to be notified of one of your children failing, you need to
watch
the childTerminated()
when the actor dies with a reference to it.Something like:
class ParentActor extends Actor {
// this is sample of how to watch for death of one of your children
val childActor = context.actorOf(Props[SomeService], "SomeService")
val dyingChild = context.watch(context.actorOf(childActor))
def receive = {
case Terminated(`dyingChild`) =>
println("dyingChild died")
case Terminated(terminatedActor) =>
println(s"This child just died $terminatedActor")
}
}
Hope this helps.
Upvotes: 0