Reputation: 35
In my project I'm using Akka's Actors. By definition Actors are thread-safe, which means that in the Actor's receive method
def receive = {
case msg =>
// some logic here
}
only one thread at a time processes the commented piece of code. However, things are starting to get more complicated when this code is asynchronous:
def receive = {
case msg =>
Future {
// some logic here
}
}
If I understand this correctly, in this case only the Future construct will be synchronized, so to speak, and not the logic inside the Future.
Of course I may block the Future:
def receive = {
case msg =>
val future = Future {
// some logic here
}
Await.result(future, 10.seconds)
}
which solves the problem, but I think we all should agree that this is hardly an acceptable solution.
So this is my question: how can I retain the thread-safe nature of actors in case of asynchronous computing without blocking Scala's Futures?
Upvotes: 3
Views: 783
Reputation: 104
I have came across this situation before and the solution that I came up with is to store the previous message's future result as a state within the actor and each new incoming message will chain onto it. This way, each message will be processed only after the previous message was successfully terminated (you could recover failed message to keep message processing going).
class MyActor extends Actor {
var previousMessage: Future[String] = Future.successful("inital result")
override def receive: Receive = {
case message => {
previousMessage = previousMessage
.map(Some(_))
.recover{case _ => Option.empty}
.flatMap(previousMessageResult /* Option[String] */ => {
// process message
Future.successful("Result")
})
}
}
}
I can't tell if there are any downsides of this.
Upvotes: 0
Reputation: 25661
the simplest solution here is to turn the actor into a state machine (use AkkaFSM) and do the following:
depending on context you might have to do some more to get a proper response.
But this has the advantage that you process the message with the actor state and you can mutate the actor state as you please as you own the thread.
Upvotes: 0
Reputation: 484
I think you need to "resolve" the db query first and then use the result to return a new Future
. If the db query returns a Future[A]
, then you can use flatMap
to operate over A
and return a new Future
. Something in the lines of
def receive = {
case msg =>
val futureResult: Future[Result] = ...
futureResult.flatMap { result: Result =>
// ....
// return a new Future
}
}
Upvotes: 0
Reputation: 149636
How can I retain the thread-safe nature of actors in case of asynchronous computing without block Scalas
Future
?
This assumption is only true if you modify the internal state of the actor inside the Future
which seems to be a design smell in the first place. Use the future for computation only by creating a copy of the data and pipe to result of the computation to the actor using pipeTo
. Once the actor receives the result of the computation you can safely operate on it:
import akka.pattern.pipe
case class ComputationResult(s: String)
def receive = {
case ComputationResult(s) => // modify internal state here
case msg =>
Future {
// Compute here, don't modify state
ComputationResult("finished computing")
}.pipeTo(self)
}
Upvotes: 6