Johann Heinzelreiter
Johann Heinzelreiter

Reputation: 819

Single task typed actors

I often use actors that perform a single computationally expensive task. After completing its task this type of actors sends the result to its creator und terminates itself afterwards.

Im am not quit sure what's the best practice for implementing such actors. I could imagine the following possibilities:

Variant 1: start task in "constructor"

object SingleTaskBehavior:

  sealed trait Reply
  final case class Result(value: Int) extends Reply

  def variant1(arg: Int, replyTo: ActorRef[Reply]): Behavior[Nothing] =
    Behaviors.setup[Nothing] { context =>
      val result = performLongRunningTask(arg)
      replyTo ! Result(result)
      Behaviors.stopped
    }
  end variant1

Variant 2: send message to actor itself

object SingleTaskBehavior:

  sealed trait Command
  private case object Init extends Command

  sealed trait Reply
  final case class Result(value: Int) extends Reply

  def variant2(arg: Int, replyTo: ActorRef[Reply]): Behavior[Command] =

    Behaviors.setup[Command] { context =>

      context.self ! Init

      Behaviors
        .receiveMessage[Command] {
          case Init =>
            val result = performLongRunningTask(arg)
            replyTo ! Result(result)
            Behaviors.stopped
        }
    }
  end variant2

Variant 3: pipe pattern

object SingleTaskBehavior:

  sealed trait Command
  private case object Init extends Command
  private final case class AdaptedResult(result: Result) extends Command
  private final case class AdaptedFailure(ex: Throwable) extends Command

  sealed trait Reply
  final case class Result(value: Int) extends Reply

  def variant3(arg: Int, replyTo: ActorRef[Reply]): Behavior[Command] =

    Behaviors.setup[Command] { context =>
      given ExecutionContext = context.system.executionContext
      val futureResult = Future { performLongRunningTask(arg) }
      context.pipeToSelf(futureResult) {
        case Success(r) => AdaptedResult(Result(r))
        case Failure(ex) => AdaptedFailure(ex)
      }

      Behaviors
        .receiveMessage[Command] {
          case AdaptedResult(result) =>
            replyTo ! result
            Behaviors.stopped
          case AdaptedFailure(ex) => throw ex
        }
    }
  end variant3

Variant 1 is of course the simplest one. But is it also correct? Is it okay to execute a long-running task in the setup method (so to say in the constructor of the actor) even if unhandled exceptions may be thrown in the long-running task? Or is it better to perform this type of operations only when messages are dispatched by the actor system (variant 2 and variant 3)?

Upvotes: 0

Views: 53

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20611

Assuming that you're running the single-task actor on a dispatcher which is well-suited to long-running tasks (e.g. one with a lot of threads for tasks which are long-running because they do blocking I/O and don't consume CPU or one with very few threads relative to the default dispatcher for tasks which are long-running because they do a lot of computation), any of the approaches is viable. Note that the default Akka dispatcher is not particularly well-suited to either of these.

Note that Behaviors.setup isn't quite like a constructor: it's more akin to registering a callback that will execute in the actor's "cell" after the actor has started but before it processes any message. In a context.spawn(SingleTaskBehavior.variant1(42, context.self)), the evaluation in the caller is effectively:

  • wrap the closure { context => ... } in a WrappedBehavior
  • spawn an actor with that WrappedBehavior

The ActorSystem then runs that WrappedBehavior (almost certainly in a different thread and only in the same thread if the spawning actor has completed processing the message), passing in the ActorContext.

Because Init is private in variant2, there isn't really a practical difference between variant1 and variant2 (especially since we're not introducing supervision).

That said, an actor which doesn't receive messages is a bit of a smell: the whole benefit of having an actor is to receive messages and change behavior based on the messages received. Such an actor is probably better off as a Future, in which case you'd just, in the spawning actor:

context.pipeToSelf(Future { performLongRunningTask(42) }(appropriateDispatcher)) {
  case Success(r) => Result(r)
  case Failure(ex) => NoOp  // a command which this actor ignores
}

NB: If your long-running task is long-running because it both does blocking I/O and consumes CPU, you're going to be best off treating it as subtasks that basically only do one or the other and scheduling those subtasks on the respective appropriate dispatcher.

Upvotes: 2

Related Questions