Reputation: 91959
Requirement?
- There has to be a long running process(daemon) that should run forever
- In case of any exceptions, it should be restarted, but if it fails again twice, no restart efforts should be taken
Problem I face?
- The actor is restarted but no message sent again
What I have?
Main Class
package com.learner.ahka.runforever
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object RaceEvent extends App {
val config = ConfigFactory.parseString( """
akka.loglevel = "DEBUG"
akka.actor.debug {
receive = on
lifecycle = on
}
""")
val system = ActorSystem.create("race", config)
val coach = system.actorOf(Coach.props(), "coach")
coach ! GetSetGo
}
Supervisor
package com.learner.ahka.runforever
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor._
import akka.event.LoggingReceive
import scala.concurrent.duration._
case object GetSetGo
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! GoForIt
}
}
Actor
package com.learner.ahka.runforever
import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
override def receive: Receive = LoggingReceive {
case GoForIt => race.start pipeTo self
case Failure(throwable) => throw throwable
}
}
Actual work
package com.learner.ahka.runforever
import scala.concurrent.Future
case object GoForIt
trait Race {
def start: Future[Any]
}
class Marathon extends Race {
import scala.concurrent.ExecutionContext.Implicits.global
override def start: Future[Any] = future
val future = Future {
for (i <- 1 to 3) {
println("I am a Marathon Runner!")
Thread.sleep(1000)
}
throw new RuntimeException("MarathonRunner is tired")
}
}
Logs
[DEBUG] [05/30/2015 16:03:35.696] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 16:03:35.698] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 16:03:35.704] [race-akka.actor.default-dispatcher-4] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-1391310385]
[DEBUG] [05/30/2015 16:03:35.706] [race-akka.actor.default-dispatcher-3] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@191ba186)
[DEBUG] [05/30/2015 16:03:35.710] [race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising Actor[akka://race/user/coach#-1161587711]
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@66f0f319)
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@72f67980)
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#755574648]
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message GetSetGo
[DEBUG] [05/30/2015 16:03:35.725] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:38.739] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 16:03:38.752] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 16:03:38.753] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting
[DEBUG] [05/30/2015 16:03:38.755] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted
UPDATE
If I don't delegate to Future
, everything works as expected, even in case of restart, but when delegated to Future
, Future
does not get executed in case of actor restart
. See here
Upvotes: 2
Views: 1186
Reputation: 1863
It is not well known, but you can access sender() from the supervisorStrategy. This means that you can easily identify the actor that you are going to restart (sender() will point to the ActorRef that is currently being decided on). This is completely safe.
Upvotes: 1
Reputation: 15074
You could override the postRestart method to send a message back to the parent to notify it of the restart, then watch for that new message type in the parent and respond accordingly. If context.parent
won't work happily for that purpose (I tend not to rely on it), then have the Coach actor pass it's self
actor reference as a new constructor parameter when instantiating the Runner
.
Upvotes: 1