Gleeb
Gleeb

Reputation: 11299

How to ensure message consistency when using futures in Akka

I would like to understand how to work with a stateful actor when I have async calls within the action.

Consider the following actor:

@Singleton
class MyActor @Inject() () extends Actor with LazyLogging  {
  import context.dispatcher
  override def receive: Receive = {
    case Test(id: String) =>
      Future { logger.debug(s"id [$id]") }
    }
}

and a call to this actor:

Stream.range(1, 100).foreach { i =>
  MyActor ! Test(i.toString)
}

This will give me an inconsistent printing of the series.

How am I supposed to use futures inside an actor without loosing the entire "one message after another" functionality?

Upvotes: 1

Views: 213

Answers (4)

Ion Cojocaru
Ion Cojocaru

Reputation: 2583

Another way would be to send itself a message on Future.onComplete assuming that there are no restrictions on the order of processing

    //in receive  
    val future = Future { logger.debug(s"id [$id]") }
    f.onComplete {
        case Success(value) => self ! TestResult(s"Got the callback, meaning = $value")
        case Failure(e) => self ! TestError(e)
      }

Upvotes: -1

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

What you're observing is not a violation of Akka's "message ordering per sender–receiver pair" guarantee; what you're observing is the nondeterministic nature of Futures, as @almendar mentions in his or her answer.

The Test messages are being sent to MyActor sequentially, from Test("1") to Test("100"), and MyActor is processing each message in its receive block in that same order. However, you're logging each message inside a Future, and the order in which those Futures are completed is nondeterministic. This is why you see the "inconsistent printing of the series."

To get the desired behavior of sequential logging of the messages, don't wrap the logging in a Future. If you must use a Future inside an actor's receive block, then @almendar's approach of using a var inside the actor is safe.

Upvotes: 2

bartholomaios
bartholomaios

Reputation: 123

You can use context.become and stash messages, wait for the end of the future and process another message. More about how to use stash with example you can find in documentation http://doc.akka.io/api/akka/current/akka/actor/Stash.html

Remember - messages ordering is guarantee only if messages are sent from the same machine because of network characteristic.

Upvotes: 1

almendar
almendar

Reputation: 1813

You should store that Future in a var then on every next message you should make a flatMap call.

if(storedFut == null) storedFut = Future { logger.debug(s"id [$id]") }
else storedFut = storedFut.flatMap(_ => Future { logger.debug(s"id [$id]") })

flatMap is exactly for ordering of Futures.

Sidenote

If you want thing to happen in parallel you're in the nondeterministic zone, where you cannot impose ordering

Upvotes: 2

Related Questions