Sergey Volkov
Sergey Volkov

Reputation: 35

Asynchronous message handling with Akka's Actors

In my project I'm using Akka's Actors. By definition Actors are thread-safe, which means that in the Actor's receive method

def receive = {
    case msg =>
        // some logic here
}

only one thread at a time processes the commented piece of code. However, things are starting to get more complicated when this code is asynchronous:

def receive = {
    case msg =>
        Future {
            // some logic here
        }
}

If I understand this correctly, in this case only the Future construct will be synchronized, so to speak, and not the logic inside the Future.

Of course I may block the Future:

def receive = {
    case msg =>
        val future = Future {
            // some logic here
        }
        Await.result(future, 10.seconds)
}

which solves the problem, but I think we all should agree that this is hardly an acceptable solution.

So this is my question: how can I retain the thread-safe nature of actors in case of asynchronous computing without blocking Scala's Futures?

Upvotes: 3

Views: 783

Answers (4)

Kyros
Kyros

Reputation: 104

I have came across this situation before and the solution that I came up with is to store the previous message's future result as a state within the actor and each new incoming message will chain onto it. This way, each message will be processed only after the previous message was successfully terminated (you could recover failed message to keep message processing going).

class MyActor extends Actor {
  var previousMessage: Future[String] = Future.successful("inital result")

  override def receive: Receive = {
    case message => {
      previousMessage = previousMessage
        .map(Some(_))
        .recover{case _ => Option.empty}
        .flatMap(previousMessageResult /* Option[String] */ => {
          // process message
          Future.successful("Result")
        })
    }
  }
}

I can't tell if there are any downsides of this.

Upvotes: 0

Alexander Oh
Alexander Oh

Reputation: 25661

the simplest solution here is to turn the actor into a state machine (use AkkaFSM) and do the following:

  • dispatch a future for the mongoDB request.
  • use the reference to your own actor to commuincate with your actor
  • tell the message back from the future.

depending on context you might have to do some more to get a proper response.

But this has the advantage that you process the message with the actor state and you can mutate the actor state as you please as you own the thread.

Upvotes: 0

acidghost
acidghost

Reputation: 484

I think you need to "resolve" the db query first and then use the result to return a new Future. If the db query returns a Future[A], then you can use flatMap to operate over A and return a new Future. Something in the lines of

def receive = {
    case msg =>
        val futureResult: Future[Result] = ...
        futureResult.flatMap { result: Result =>
            // ....
            // return a new Future
        }
}

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149636

How can I retain the thread-safe nature of actors in case of asynchronous computing without block Scalas Future?

This assumption is only true if you modify the internal state of the actor inside the Future which seems to be a design smell in the first place. Use the future for computation only by creating a copy of the data and pipe to result of the computation to the actor using pipeTo. Once the actor receives the result of the computation you can safely operate on it:

import akka.pattern.pipe

case class ComputationResult(s: String)

def receive = {
  case ComputationResult(s) => // modify internal state here
  case msg =>
    Future {
       // Compute here, don't modify state
       ComputationResult("finished computing")
    }.pipeTo(self)
}

Upvotes: 6

Related Questions