Finkelson
Finkelson

Reputation: 3013

Akka: The order of responses

My demo app is simple. Here is an actor:

class CounterActor extends Actor {
  @volatile private[this] var counter = 0

  def receive: PartialFunction[Any, Unit] = {
    case Count(id)     ⇒ sender ! self ? Increment(id)
    case Increment(id) ⇒ sender ! {
      counter += 1
      println(s"req_id=$id, counter=$counter")
      counter
    }
  }
}

The main app:

sealed trait ActorMessage
case class Count(id: Int = 0) extends ActorMessage
case class Increment(id: Int) extends ActorMessage

object CountingApp extends App {

  // Get incremented counter
  val future0 = counter ? Count(1)
  val future1 = counter ? Count(2)
  val future2 = counter ? Count(3)
  val future3 = counter ? Count(4)
  val future4 = counter ? Count(5)

  // Handle response
  handleResponse(future0)
  handleResponse(future1)
  handleResponse(future2)
  handleResponse(future3)
  handleResponse(future4)

  // Bye!
  exit()
}

My handler:

def handleResponse(future: Future[Any]): Unit = {
  future.onComplete {

    case Success(f) => f.asInstanceOf[Future[Any]].onComplete {
      case x => x match {
        case Success(n) => println(s" -> $n")
        case Failure(t) => println(s" -> ${t.getMessage}")
      }
    }

    case Failure(t) => println(t.getMessage)
  }
}

If I run the app I'll see the next output:

req_id=1, counter=1
req_id=2, counter=2
req_id=3, counter=3
req_id=4, counter=4
req_id=5, counter=5
 -> 4
 -> 1
 -> 5
 -> 3
 -> 2

The order of handled responses is random. Is it normal behaviour? If no, how can I make it ordered?

PS

Do I need volatile var in the actor?

PS2

Also, I'm looking for some more convenient logic for handleResponse, because matching here is very ambiguous...

Upvotes: 1

Views: 145

Answers (2)

normal behavior?

Yes, this is absolutely normal behavior.

Your Actor is receiving the Count increments in the order you sent them but the Futures are being completed via submission to an underlying thread pool. It is that indeterminate ordering of Future-thread binding that is resulting in the out of order println executions.

how can I make it ordered?

If you want ordered execution of Futures then that is synonymous with synchronous programming, i.e. no concurrency at all.

do I need volatile?

The state of an Actor is only accessible within an Actor itself. That is why users of the Actor never get an actual Actor object, they only get an ActorRef, e.g. val actorRef = actorSystem actorOf Props[Actor] . This is partially to ensure that users of Actors never have the ability to change an Actor's state except through messaging. From the docs:

The good news is that Akka actors conceptually each have their own light-weight thread, which is completely shielded from the rest of the system. This means that instead of having to synchronize access using locks you can just write your actor code without worrying about concurrency at all.

Therefore, you don't need volatile.

more convenient logic

For more convenient logic I would recommend Agents, which are a kind of typed Actor with a simpler message framework. From the docs:

import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent

val agent = Agent(5)

val result = agent()

val result = agent.get

agent send 7

agent send (_ + 1)

Reads are synchronous but instantaneous. Writes are asynch. This means any time you do a read you don't have to worry about Futures because the internal value returns immediately. But definitely read the docs because there are more complicated tricks you can play with the queueing logic.

Upvotes: 3

Odomontois
Odomontois

Reputation: 16308

Real trouble in your approach is not asynchronous nature but overcomplicated logic.

And despite pretty answer from Ramon which I +1d, yes there is way to ensure order in some parts of akka. As we can read from the doc there is message ordering per sender–receiver pair guarantee.

It means that for each one-way channel of two actors there is guarantee, that messages will be delivered in order they been sent.

But there is no such guarantee for Future task accomplishment order which you are using to handle answers. And sending Future from ask as message to original sender is way strange.

Thing you can do: redefine your Increment as

case class Increment(id: Int, requester: ActorRef) extends ActorMessage

so handler could know original requester

modify CounterActor's receive as

def receive: Receive = {
    case Count(id) ⇒ self ! Increment(id, sender)
    case Increment(id, snd) ⇒ snd ! {
      counter += 1
      println(s"req_id=$id, counter=$counter")
      counter
    }
  }

simplify your handleResponse to

  def handleResponse(future: Future[Any]): Unit = {
    future.onComplete {
      case Success(n: Int) => println(s" -> $n")

      case Failure(t) => println(t.getMessage)
    }
  }

Now you can probably see that messages are received back in the same order.

I said probably because handling still occures in Future.onComplete so we need another actor to ensure the order.

Lets define additional message

case object StartCounting

And actor itself:

class SenderActor extends Actor {
  val counter = system.actorOf(Props[CounterActor])

  def receive: Actor.Receive = {
    case n: Int => println(s" -> $n")
    case StartCounting =>
      counter ! Count(1)
      counter ! Count(2)
      counter ! Count(3)
      counter ! Count(4)
      counter ! Count(5)
  }
}

In your main you can now just write

val sender = system.actorOf(Props[SenderActor])

sender ! StartCounting

And throw away that handleResponse method.

Now you definitely should see your message handling in the right order.

We've implemented whole logic without single ask, and that's good.

So magic rule is: leave handling responses to actors, get only final results from them via ask.

Note there is also forward method but this creates proxy actor so message ordering will be broken again.

Upvotes: 2

Related Questions