Akka supervisor catch future Failure

I'm trying to develop an application using Futures and Akka supervisors but when A future returns a Failure to an actor its supervisor is not getting the exception.

Here's my code.

1) Supervisor actor

class TransaccionActorSupervisor() extends Actor with ActorLogging {

  val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")

  def receive = {
    case msg: Any => actor forward msg
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case exception =>
      println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
      Restart
  }

}

Supervised actor

Class TransaccionActor() extends Actor with ActorLogging {

  implicit val _: ExecutionContext = context.dispatcher
  val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter.searchTransaction(msg.id).onComplete {
         case Success(transaction) => currentSender ! transaction
         case Failure(error) => throw error
      }

  }

What am I doing wrong?

Thank you all very much!

Upvotes: 4

Views: 978

Answers (2)

ps_ttf
ps_ttf

Reputation: 1094

I had the same problem and Ryan's answer did help. But because I'm new to Akka it was not trivial to understand the answer so I would like to provide some details.

First, I think that onComplete will not work at all. It just registers the callback function which may be called in a completely separate thread and does not return a new Future. So any exception thrown within onComplete will be lost.

Instead, it is better to use map, recover, recoverWith or transform as they return new Futures. Then you need to the pipe the result to an actor, e.g. self; the receiving actor has to process the piped result and rethrow the exception.

In other words your supervised actor should look like this:

import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor

class TransaccionActor() extends Actor with ActorLogging {

  import context.dispatcher

  val transaccionAdapter = 
    (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter searchTransaction msg.id map { transaction =>
         currentSender ! transaction
      } pipeTo self

    case Failure(throwable) => throw throwable

  }

}

Upvotes: 4

Ryan
Ryan

Reputation: 7257

Exceptions thrown in a future within an actor are not caught by the actor. You'll need to pipe the exception to self and then re-throw if you want it handled by the supervising actor.

Upvotes: 0

Related Questions