Reputation: 3897
I have an actor which is orchestrating database updates. I need to ensure that each operation gets executed only after the previous one gets completed. This because operation B will reuse the result of operation A.
Here the code I wrote for the actor.
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
implicit val ec:ExecutionContext = context.system.dispatcher
def receive: Receive = {
case newInfo : UpdateDb =>
val future = Future {
// gets the current situation from DB
val status = databaseOperations.getSituation()
// do db update
databaseOperations.save(something)
}
future onComplete {
case Success(result: List[Int]) =>
//
case Failure(err: Throwable) =>
//
}
}
}
The code works fine for a single operation. If I fire two updates then the second one is executed asynchronously so it gets started before the first one has completed.
I was reading about different types of mailbox, not sure if having a different one would help.
Any suggestion?
Upvotes: 0
Views: 358
Reputation: 556
How about this: start operation A in a child; when the child is complete it sends the parent a message saying it completed. Then you can start operation B, either in the existing or a new child.
Upvotes: 0
Reputation: 35443
One option you can explore would be to remove that Future
and allow that blocking db code to be run within the actor. Then, use a separate dispatcher (perhaps a PinnedDispatcher) to fire-wall this blocking code off from the main actor system's dispatcher, giving it its own thread to run on. By blocking in the body and removing that Future
, you will ensure proper sequential execution of the actor's mailbox. A rough sketch of the changes to make that work are as follows:
object DbUpdateActor{
def props(databaseOperations:DBProvider) =
Props(classOf[DbUpdateActor], databaseOperations).
withDispatcher("db-update-dispatcher")
}
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
def receive: Receive = {
case newInfo : UpdateDb =>
val status = databaseOperations.getSituation()
databaseOperations.save(something)
}
}
Then, as long as you had the following dispatcher configured in your actor system config:
db-update-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
And you started up the db update actor like so:
val updater = system.actorOf(DbUpdateActor.props(databaseOperations))
Then you should be all set setting this actor up to run that blocking code in a way that won't negatively affect the throughput of the main dispatcher.
Upvotes: 1