daydreamer
daydreamer

Reputation: 91959

How to send message from Supervisor to Actor once Actor restarts?

Requirement?
- There has to be a long running process(daemon) that should run forever
- In case of any exceptions, it should be restarted, but if it fails again twice, no restart efforts should be taken

Problem I face?
- The actor is restarted but no message sent again

What I have?

Main Class

package com.learner.ahka.runforever

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RaceEvent extends App {
  val config = ConfigFactory.parseString( """
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
                                          """)
  val system = ActorSystem.create("race", config)
  val coach = system.actorOf(Coach.props(), "coach")
  coach ! GetSetGo
}

Supervisor

package com.learner.ahka.runforever

import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor._
import akka.event.LoggingReceive

import scala.concurrent.duration._

case object GetSetGo

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! GoForIt
  }
}

Actor

package com.learner.ahka.runforever

import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  import context.dispatcher

  override def receive: Receive = LoggingReceive {
    case GoForIt => race.start pipeTo self
    case Failure(throwable) => throw throwable
  }
}

Actual work

package com.learner.ahka.runforever

import scala.concurrent.Future

case object GoForIt

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = future

  val future = Future {
    for (i <- 1 to 3) {
      println("I am a Marathon Runner!")
      Thread.sleep(1000)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

Logs

[DEBUG] [05/30/2015 16:03:35.696] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 16:03:35.698] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 16:03:35.704] [race-akka.actor.default-dispatcher-4] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-1391310385]
[DEBUG] [05/30/2015 16:03:35.706] [race-akka.actor.default-dispatcher-3] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@191ba186)
[DEBUG] [05/30/2015 16:03:35.710] [race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising Actor[akka://race/user/coach#-1161587711]
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@66f0f319)
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@72f67980)
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#755574648]
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message GetSetGo
[DEBUG] [05/30/2015 16:03:35.725] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:38.739] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 16:03:38.752] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
    at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
    at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 16:03:38.753] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting
[DEBUG] [05/30/2015 16:03:38.755] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted

UPDATE
If I don't delegate to Future, everything works as expected, even in case of restart, but when delegated to Future, Future does not get executed in case of actor restart. See here

Upvotes: 2

Views: 1186

Answers (2)

Endre Varga
Endre Varga

Reputation: 1863

It is not well known, but you can access sender() from the supervisorStrategy. This means that you can easily identify the actor that you are going to restart (sender() will point to the ActorRef that is currently being decided on). This is completely safe.

Upvotes: 1

Shadowlands
Shadowlands

Reputation: 15074

You could override the postRestart method to send a message back to the parent to notify it of the restart, then watch for that new message type in the parent and respond accordingly. If context.parent won't work happily for that purpose (I tend not to rely on it), then have the Coach actor pass it's self actor reference as a new constructor parameter when instantiating the Runner.

Upvotes: 1

Related Questions