Reputation: 43
Let's say we have an Akka actor, which maintains an internal state in terms of a var
.
class FooActor extends Actor {
private var state: Int = 0
def receive = { ... }
}
Let's say the reception handler invokes an operation that returns a future, we map it using the dispatcher as context executor and finally we set a onSuccess
callback that alters the actor state.
import context.dispatcher
def receive = {
case "Hello" => requestSomething() // asume Future[String]
.map(_.size)
.onSuccess { case i => state = i }
}
Is it thread-safe to alter the state of the actor from the onSuccess
callback, even using the actor dispatcher as execution context?
Upvotes: 4
Views: 446
Reputation: 21605
No it's not (akka 2.3.4 documentation).
What you have to do in this case is send a message to self to alter the state. If you need ordering you can use stash and become. Something like this
import akka.actor.{Stash,Actor}
import akka.pattern.pipe
case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
private var state: Int = 0
def receive = ready
def ready = {
case "Hello" => requestSomething() // asume Future[String]
.map(StateUpdate(_.size)) pipeTo self
become(busy)
}
def busy {
case StateUpdate(i) =>
state=i
unstashAll()
become(ready)
case State.Failure(t:Throwable) => // the future failed
case evt =>
stash()
}
}
Of course this is a simplistic implementation you will probably want to handle timeout and stuff to avoid having your actor stuck.
if you don't need ordering guarantees on your state :
case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
private var state: Int = 0
def receive = {
case "Hello" => requestSomething() // asume Future[String]
.map(StateUpdate(_.size)) pipeTo self
case StateUpdate(i) => state=i
}
but then the actor state may not be the length of the last string received
Upvotes: 10
Reputation: 2631
Just to support Jean's answer here's the example from the docs :
class MyActor extends Actor {
var state = ...
def receive = {
case _ =>
//Wrongs
// Very bad, shared mutable state,
// will break your application in weird ways
Future {state = NewState}
anotherActor ? message onSuccess {
r => state = r
}
// Very bad, "sender" changes for every message,
// shared mutable state bug
Future {expensiveCalculation(sender())}
//Rights
// Completely safe, "self" is OK to close over
// and it's an ActorRef, which is thread-safe
Future {expensiveCalculation()} onComplete {
f => self ! f.value.get
}
// Completely safe, we close over a fixed value
// and it's an ActorRef, which is thread-safe
val currentSender = sender()
Future {expensiveCalculation(currentSender)}
}
}
Upvotes: 1