Reputation: 9325
I have two actors:
ProcessManager which handles some processes in the system (for example, user registration, purchase, etc)
Notifier - should notify the user if some error occurred in ProcessManager. I need to catch failure of ProcessManager actor (it was failed and stopped for what ever reason, for example, because of ActorInitializationException or max restart time reached and Process manager actor was stopped).
class ProcessManager extends Actor {
override def receive: Receive = {
...
}
}
class Notifier extends Actor {
override def receive: Receive = {
PROCESS MANAGER ACTOR FAILED AND STOPPED =>
// Here I need to catch failure of ProcessManager actor
// (it was failed and stopped for what ever
// reason, for example, because of ActorInitializationException
// or max restart time reached and Process manager actor was stopped).
//
// Then do some stuff, for example, send message to the client via web socket.
}
}
class MyController @Inject() (cc: ControllerComponents, actorSystem: ActorSystem)
(implicit exec: ExecutionContext) extends AbstractController(cc) {
// I need to catch failure of processManager in this actor.
val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
def registerUser = Action.async {
// Actor may be stopped because of ActorInitializationException here
val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
...
// OR it may be stopped here for any reason.
processManager ! "some message which will fail and stop pm actor"
Future.successfull(Ok("Thanks."))
}
}
How can I catch termination (because of failure) of ProcessManager actor inside Notifier actor?
EDIT Let me explain the context of my problem.
I'm creating PM actor in Play controller and send the message to it (Tell) and I return Ok response immediately to the user. PM actor creates another child actor and during creation, ActorInitializationException is thrown. I need to notify the user (via web socket, using Notifier actor), that something went wrong.
Upvotes: 1
Views: 396
Reputation: 19497
You can use DeathWatch
to register the Notifier
actor for reception of the Terminated
message when the ProcessManager
actor permanently stops. Notifier
will need a reference to the ProcessManager
actor for the DeathWatch
, and one way to do that is to send the reference to ProcessManager
as a message (this is safe because ActorRef
is immutable and serializable).
class Notifier extends Actor {
var processManager: Option[ActorRef] = None
def receive: Receive = {
case aRef: ActorRef =>
if (processManager.isEmpty) {
processManager = Some(aRef)
context.watch(aRef) // register to "watch" the process manager
}
case Terminated =>
// process manager was permanently stopped
case ...
}
}
object Demo extends App {
val actorSystem = ActorSystem("my-actor-system")
val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
notifier ! processManager // send processManager's ref to the notifier
...
processManager ! "some message which will fail and stop pm actor"
...
}
One caveat: it might not be possible for the DeathWatch
registration to happen before an ActorInitializationException
is thrown when attempting to create the ProcessManager
.
If you need to send a message to Notifier
when a child of ProcessManager
throws an exception, then override the supervisor strategy in ProcessManager
and send this message as part of the strategy. Something like:
class ProcessManager extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ActorInitializationException =>
val notifier = context.actorSelection("/path/to/notifier")
notifier ! CustomErrorMessage
Stop
case _: Exception => Escalate
}
def receive: Receive = {
...
}
}
Upvotes: 3