ab_732
ab_732

Reputation: 3897

Need to execute operations sequentially with Akka

I have an actor which is orchestrating database updates. I need to ensure that each operation gets executed only after the previous one gets completed. This because operation B will reuse the result of operation A.

Here the code I wrote for the actor.

class DbUpdateActor(databaseOperations: DBProvider) extends Actor {

  implicit val ec:ExecutionContext = context.system.dispatcher

  def receive: Receive = {

    case newInfo : UpdateDb =>

      val future = Future {
            // gets the current situation from DB 
            val status = databaseOperations.getSituation()
            // do db update
            databaseOperations.save(something)
      }

      future onComplete {
        case Success(result: List[Int]) =>
            //
        case Failure(err: Throwable) =>
            //
      }
  }
}

The code works fine for a single operation. If I fire two updates then the second one is executed asynchronously so it gets started before the first one has completed.

I was reading about different types of mailbox, not sure if having a different one would help.

Any suggestion?

Upvotes: 0

Views: 358

Answers (2)

Rob Crawford
Rob Crawford

Reputation: 556

How about this: start operation A in a child; when the child is complete it sends the parent a message saying it completed. Then you can start operation B, either in the existing or a new child.

Upvotes: 0

cmbaxter
cmbaxter

Reputation: 35443

One option you can explore would be to remove that Future and allow that blocking db code to be run within the actor. Then, use a separate dispatcher (perhaps a PinnedDispatcher) to fire-wall this blocking code off from the main actor system's dispatcher, giving it its own thread to run on. By blocking in the body and removing that Future, you will ensure proper sequential execution of the actor's mailbox. A rough sketch of the changes to make that work are as follows:

object DbUpdateActor{
  def props(databaseOperations:DBProvider) =
    Props(classOf[DbUpdateActor], databaseOperations).
      withDispatcher("db-update-dispatcher")
}

class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
  def receive: Receive = {

    case newInfo : UpdateDb =>
      val status = databaseOperations.getSituation()
      databaseOperations.save(something)
  }
}

Then, as long as you had the following dispatcher configured in your actor system config:

db-update-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

And you started up the db update actor like so:

val updater = system.actorOf(DbUpdateActor.props(databaseOperations))

Then you should be all set setting this actor up to run that blocking code in a way that won't negatively affect the throughput of the main dispatcher.

Upvotes: 1

Related Questions