adityagnet
adityagnet

Reputation: 67

How to resume an akka actor that failed while processing a message that it sends itself?

I have the following example actor code.

object SomeExternalDep {
  private var flag = true

  // this function will throw an exception once when called with the value 3, then it won't throw another exception
  @throws[RuntimeException]
  def potentiallyThrows(curr: Int): Unit = {
    if (curr == 3 && flag) {
      flag = false
      throw new RuntimeException("Something went wrong in external dependency")
    }
  }
}

class CountingActor(start: Int, end: Int)
  extends Actor
    with ActorLogging {

  var curr: Int = start

  // This method counts for us
  private def doCount(): Unit = {
    // This may throw, which will cause this actor to fail
    SomeExternalDep.potentiallyThrows(curr)

    // Send self a count message. If the above call exceptions this is never called
    if (curr <= end) {
      self ! CountingActor.Count(curr)
    }
  }

  override def receive: Receive = {
    case CountingActor.Start => doCount()
    case CountingActor.Count(n) =>
      log.info(s"Counting: $n")
      curr += 1
      doCount()

    case x => log.error(s"bad message $x")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(s"CountingActor failed while processing $message")
    self ! CountingActor.Start
  }
}

object CountingActor {
  def props(start: Int, end: Int): Props = Props(new CountingActor(start, end))

  case object Start
  case class Count(n: Int)
}

class SupervisorActor
  extends Actor
    with ActorLogging {

  var countingActor: ActorRef = _

  override val supervisorStrategy: OneForOneStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
      // case _: RuntimeException => Restart
      case _: RuntimeException => Resume
    }

  private def doStart(): Unit = {
    countingActor = context.actorOf(CountingActor.props(0, 5))

    countingActor ! CountingActor.Start
  }

  override def receive: Receive = {
    case SupervisorActor.Init => doStart()
    case _ => log.error(s"Supervisor doesn't process messages")
  }

}

Here, the CountingActor basically sends itself a Count message. It then calls some external dependency that may fail. It also makes some changes to it's internal state while counting. I also implemented a simple SupervisorActor. This actor creates the CountingActor as its child.

When the supervision strategy is set to Restart. I get the expected result. The actor counts to 3, fails because it sees an exception. The preRestart hook sends a new Start message to the inbox, and it starts counting again.

[INFO] [07/10/2019 15:23:59.895] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 2
[ERROR] [07/10/2019 15:23:59.905] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
java.lang.RuntimeException: Something went wrong in external dependency
    at SomeExternalDep$.potentiallyThrows(ActorSupervisionTest.scala:15)
    at CountingActor.CountingActor$$doCount(ActorSupervisionTest.scala:30)

<Stack Trace omitted>

[ERROR] [07/10/2019 15:23:59.909] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] CountingActor failed while processing Some(Count(2))
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 2
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 3
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 4
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 5

But when I change the supervision strategy to Resume. The actor gets stuck because it failed before it could send itself the next Count message.

[INFO] [07/10/2019 15:26:01.779] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 2
[WARN] [07/10/2019 15:26:01.786] [counting-sys-akka.actor.default-dispatcher-4] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency

How do I get around this so that I can resume counting from 3 when the external dependency failed?

Upvotes: 1

Views: 681

Answers (1)

AlexITC
AlexITC

Reputation: 1074

It looks like the code that actually starts Your logic is basically a loop from 1 to N where in each iteration, you send a message to go to the next iteration, the problem is that if an exception is thrown, you don't send the message to go to the next iteration, here is where the supervisor does its job, restarting is simple because the code to start the loop again is executed, but if you resume the flow, the message to go to the next iteration is never sent.

A simple workaround is to change the operations order on the doCount method, by first sending the message to itself, and then, processing the dangerous operation, this should work for the Resume strategy but I would test some scenarios before actually using this approach, one unknown to me is whether akka will discard the mailbox in case of the Restart strategy, I believe that it doesn't, which means that after restarting the actor, it will get the pending message.

Another workaround could be to resend the message from the supervisor after resuming the child actor.

Edit: I looked a bit into the akka source and there isn't an obvious way to catch the resume event, there is actually an internal Resume event but it is private to akka and is not send to your actual actor, I think that if you like to use the Resume strategy, don't bother with the supervisor and just catch the possible exceptions inside your actor (which basically emulates the resume strategy), this should give you the expected behavior instead of dealing with possible corner cases.

Upvotes: 2

Related Questions