gsimard
gsimard

Reputation: 643

One-off object pooling with Actor provider

I have an object with heavy initialization cost and memory footprint. Initialization time is human-noticeable but creation frequency is low.

class HeavyClass {
  heavyInit()
}

My solution is to create a Provider actor that would have a single object created ahead of time and provide it instantly on request. The provider would then go on with creating the next object.

class HeavyClassProvider extends Actor {

  var hc: Option[HeavyClass] = Some(new HeavyClass())

  override def receive = {
    case "REQUEST" =>
      sender ! { hc getOrElse new HeavyClass() }
      self ! "RESPAWN"
      hc = None

    case "RESPAWN" if (hc == None) => hc = Some(new HeavyClass())
  }

}

And a consumer:

abstract class HeavyClassConsumer extends Actor {

  import context.dispatcher

  import akka.pattern.ask
  import scala.concurrent.duration._
  import akka.util.Timeout

  implicit val timeout = Timeout(5, SECONDS)

  var provider: ActorRef
  var hc: Option[HeavyClass] = None

  override def receive = {
    case "START" =>
      ((provider ask "REQUEST").mapTo[HeavyClass]
       onSuccess { case h: HeavyClass => hc = Some(h) })
  }

}

Is this a common pattern ? The code feels wacky, is there an obvious cleaner way of doing this ?

Upvotes: 1

Views: 319

Answers (2)

yǝsʞǝla
yǝsʞǝla

Reputation: 16412

The problem with your solution is that when you call new HeavyClass() your actor will block until it will process that computation. Doing it in a Future or in another Actor avoids that. Here is one way to do it:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {

  // start off async computation during init:
  var hc: Future[HeavyClass] = Future(new HeavyClass)

  override def receive = {
    case "REQUEST" =>
      // send result to requester when it's complete or
      // immediately if its already complete:
      hc pipeTo sender
      // start a new computation and send to self:
      Future(new HeavyClass) pipeTo self
    case result: HeavyClass => // new result is ready
      hc = Future.successful(result) // update with newly computed result
    case Status.Failure(f) => // computation failed
      hc = Future.failed[HeavyClass](f)
      // maybe request a recomputation again
  }
}

(I didn't compile it)

One particularity about my first solution is that it does not restrict how many Futures are computed at the same time. If you receive multiple requests it will compute multiple futures which might not be desirable, although there is no race condition in this Actor. To restrict that simply introduce a Boolean flag in the Actor that tells you if you are computing something already. Also, all these vars can be replaced with become/unbecome behaviors.

Example of a single concurrent Future computation given multiple requests:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {

  // start off async computation during init:
  var hc: Future[HeavyClass] = Future(new HeavyClass) pipeTo self
  var computing: Boolean = true

  override def receive = {
    case "REQUEST" =>
      // send result to requester when it's complete or
      // immediately if its already complete:
      hc pipeTo sender
      // start a new computation and send to self:
      if(! computing)
        Future(new HeavyClass) pipeTo self
    case result: HeavyClass => // new result is ready
      hc = Future.successful(result) // update with newly computed result
      computing = false
    case Status.Failure(f) => // computation failed
      hc = Future.failed[HeavyClass](f)
      computing = false
      // maybe request a recomputation again
  }
}

EDIT: After discussing requirements further in the comments here is yet another implementation that sends a new object to the sender/client on each request in non-blocking manner:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {
  override def receive = {
    case "REQUEST" =>
      Future(new HeavyClass) pipeTo sender
  }
}

And then it can be simplified to:

object SomeFactoryObject {
  def computeLongOp: Future[HeavyClass] = Future(new HeavyClass)
}

In this case no actors are needed. The purpose of using an Actor in these cases as a synchronization mechanism and non-blocking computation is for that Actor to cache results and provide async computation with more complex logic than just Future, otherwise Future is sufficient.

Upvotes: 1

Rob Starling
Rob Starling

Reputation: 3908

I suspect it's more often done with a synchronized factory of some sort, but the actor seems as good of a synchronization mechanism as any, especially if the calling code is already built on async patterns.

One potential problem with your current implementation is that it can't parallelize creation of multiple HeavyClass objects that are requested "all at once". It might be the case that this is a feature and that parallel-creation of several would bog down the system. If, on the other hand, it's "just slow", you might want to spin off the creation of the "on-demand" instances into its own thread/actor.

Upvotes: 1

Related Questions