Alvaro Polo
Alvaro Polo

Reputation: 43

Akka, futures and critical sections

Let's say we have an Akka actor, which maintains an internal state in terms of a var.

class FooActor extends Actor {
  private var state: Int = 0

  def receive = { ... }
}

Let's say the reception handler invokes an operation that returns a future, we map it using the dispatcher as context executor and finally we set a onSuccess callback that alters the actor state.

import context.dispatcher
def receive = {
  case "Hello" => requestSomething() // asume Future[String]
    .map(_.size)
    .onSuccess { case i => state = i }
}

Is it thread-safe to alter the state of the actor from the onSuccess callback, even using the actor dispatcher as execution context?

Upvotes: 4

Views: 446

Answers (2)

Jean
Jean

Reputation: 21605

No it's not (akka 2.3.4 documentation).

What you have to do in this case is send a message to self to alter the state. If you need ordering you can use stash and become. Something like this

import akka.actor.{Stash,Actor}
import akka.pattern.pipe
case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = ready
  def ready  = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
      become(busy)
  } 
  def busy {
     case StateUpdate(i) => 
       state=i
       unstashAll()
       become(ready)
     case State.Failure(t:Throwable) => // the future failed
     case evt =>
       stash()   
  }
}

Of course this is a simplistic implementation you will probably want to handle timeout and stuff to avoid having your actor stuck.

if you don't need ordering guarantees on your state :

case class StateUpdate(i:int)
class FooActor extends Actor with Stash{
  private var state: Int = 0
  def receive = {
    case "Hello" => requestSomething() // asume Future[String]
      .map(StateUpdate(_.size)) pipeTo self
    case StateUpdate(i) => state=i
  } 

but then the actor state may not be the length of the last string received

Upvotes: 10

LMeyer
LMeyer

Reputation: 2631

Just to support Jean's answer here's the example from the docs :

class MyActor extends Actor {
    var state = ...
    def receive = {
        case _ =>
        //Wrongs

        // Very bad, shared mutable state,
        // will break your application in weird ways
        Future {state = NewState}
        anotherActor ? message onSuccess {
            r => state = r
        }

        // Very bad, "sender" changes for every message,
        // shared mutable state bug
        Future {expensiveCalculation(sender())}

        //Rights

        // Completely safe, "self" is OK to close over
        // and it's an ActorRef, which is thread-safe
        Future {expensiveCalculation()} onComplete {
            f => self ! f.value.get
        }

        // Completely safe, we close over a fixed value
        // and it's an ActorRef, which is thread-safe
        val currentSender = sender()
        Future {expensiveCalculation(currentSender)}
    }
}

Upvotes: 1

Related Questions