Teimuraz
Teimuraz

Reputation: 9325

Akka: how can I catch failure of one actor inside another (non child) actor?

I have two actors:

ProcessManager which handles some processes in the system (for example, user registration, purchase, etc)

Notifier - should notify the user if some error occurred in ProcessManager. I need to catch failure of ProcessManager actor (it was failed and stopped for what ever reason, for example, because of ActorInitializationException or max restart time reached and Process manager actor was stopped).

   class ProcessManager extends Actor {
      override def receive: Receive = {
        ...
      }
    }

    class Notifier extends Actor {
      override def receive: Receive = {
        PROCESS MANAGER ACTOR FAILED AND STOPPED =>
          // Here I need to catch failure of ProcessManager actor
          // (it was failed and stopped for what ever
          // reason, for example, because of ActorInitializationException
          // or max restart time reached and Process manager actor was stopped).
          //
          // Then do some stuff, for example, send message to the client via web socket.
      }
    }


    class MyController @Inject() (cc: ControllerComponents, actorSystem: ActorSystem)
      (implicit exec: ExecutionContext) extends AbstractController(cc)  {


      // I need to catch failure of processManager in this actor.
      val notifier = actorSystem.actorOf(Props(classOf[Notifier]))

      def registerUser = Action.async {         

          // Actor may be stopped because of ActorInitializationException here
          val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
              ...

         // OR it may be stopped here for any reason.
         processManager ! "some message which will fail and stop pm actor"

         Future.successfull(Ok("Thanks."))   
       }
   }

How can I catch termination (because of failure) of ProcessManager actor inside Notifier actor?

EDIT Let me explain the context of my problem.

I'm creating PM actor in Play controller and send the message to it (Tell) and I return Ok response immediately to the user. PM actor creates another child actor and during creation, ActorInitializationException is thrown. I need to notify the user (via web socket, using Notifier actor), that something went wrong.

Upvotes: 1

Views: 396

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

You can use DeathWatch to register the Notifier actor for reception of the Terminated message when the ProcessManager actor permanently stops. Notifier will need a reference to the ProcessManager actor for the DeathWatch, and one way to do that is to send the reference to ProcessManager as a message (this is safe because ActorRef is immutable and serializable).

class Notifier extends Actor {
  var processManager: Option[ActorRef] = None

  def receive: Receive = {
    case aRef: ActorRef =>
      if (processManager.isEmpty) {
        processManager = Some(aRef)
        context.watch(aRef) // register to "watch" the process manager
      }
    case Terminated =>
      // process manager was permanently stopped
    case ...
  }
}

object Demo extends App {
  val actorSystem = ActorSystem("my-actor-system")

  val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
  val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))

  notifier ! processManager // send processManager's ref to the notifier
  ...
  processManager ! "some message which will fail and stop pm actor"
  ...
}

One caveat: it might not be possible for the DeathWatch registration to happen before an ActorInitializationException is thrown when attempting to create the ProcessManager.


If you need to send a message to Notifier when a child of ProcessManager throws an exception, then override the supervisor strategy in ProcessManager and send this message as part of the strategy. Something like:

class ProcessManager extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ActorInitializationException =>
        val notifier = context.actorSelection("/path/to/notifier")
        notifier ! CustomErrorMessage
        Stop
      case _: Exception => Escalate
   }

   def receive: Receive = {
     ...
   }
}

Upvotes: 3

Related Questions