lex82
lex82

Reputation: 11307

Ensure message order in test when mixing futures with actor messages

I'm testing an actor that uses an asnychronous future-based API. The actor uses the pipe pattern to send a message to itself when a future completes:

import akka.pattern.pipe
// ...

// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)

In my test I mock the API so I control future completion via promises. However, this is interleaved with other messages sent directly to the actor:

myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")

Now I'm wondering how I can guarantee that the actor receives and processes message B between message A and C in my test because message B is actually sent in another thread, so I can't control the order in which the actor's mailbox receives the messages.

I thought about several possible solutions:

I don't really like either of these options but I tend to use the last one. Is there another better way I can enforce a certain message order in the tests?

Clarification: The question is not how to deal with the fact that messages might be received in random order in production. Controlling the order in the test is essential to make sure that the actor can actually deal with different message orders.

Upvotes: 1

Views: 532

Answers (3)

lex82
lex82

Reputation: 11307

After reading a lot more about akka, I finally found a better solution: Replacing the actor mailbox with one I can observe in the tests. This way I can wait until the actor receives a new message after I complete the promise. Only then the next message is sent. The code for this TestingMailbox is given at the end of the post.

Update: In Akka Typed this can be achieved very elegantly with a BehaviorInterceptor. Just wrap the Behavior under test with a custom interceptor that forwards all messages and signals but lets you observe them. The mailbox solution for untyped Akka is given below.


The actor can be configured like this:

actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))

I have to make sure the "testing-mailbox" is known by the actor system by providing a configuration:

class MyTest extends TestKit(ActorSystem("some name",
    ConfigFactory.parseString("""{ 
        testing-mailbox = {
            mailbox-type = "my.package.TestingMailbox" 
        }
    }"""))) 
    with BeforeAndAfterAll // ... and so on

With this being set up, I can change my test like this:

myActor ! Message("A")
val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
promiseFromApiCall.success(Message("B"))
Await.ready(nextMessage, 3.seconds)
myActor ! Message("C")

With a little helper method, I can even write it like this:

myActor ! Message("A")
receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
myActor ! Message("C")

And this is my custom mailbox:

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config 
import scala.concurrent.{Future, Promise}

object TestingMailbox {

  val promisesByReceiver =
    scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()

  class MessageQueue extends UnboundedMailbox.MessageQueue {

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      super.enqueue(receiver, handle)
      promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
    }

  }

  def nextMessage(receiver: ActorRef): Future[Any] =
    promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future

}

class TestingMailbox extends MailboxType
  with ProducesMessageQueue[TestingMailbox.MessageQueue] {

  import TestingMailbox._

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]) =
      new MessageQueue()

}

Upvotes: 1

lasekio
lasekio

Reputation: 318

If it is so important to order messages you should use ask (?) which returns Future and chain them even if you dont expect any response from an actor.

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19517

One idea is to define a flag in your actor that indicates whether the actor has received message B. When the actor receives message C, the actor can stash that message C if the flag is false, then unstash it once the actor receives message B. For example:

class MyActor extends Actor with Stash {

  def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
    case MakeApiCall =>
      callExternalApi().mapTo[MessageB].pipeTo(self)

    case m: MessageB if seenMsgC => // assume msg C has been stashed
      unstashAll()
      // ...do something with msg B
      become(receiveBlock(true, seenMsgC)) // true, true
    case m: MessageB if !seenMsgC =>
      // ...do something with message B
      become(receiveBlock(true, seenMsgC)) // true, false

    case m: MessageC if seenMsgB =>
      // ...do something with message C
      context.become(receiveBlock(seenMsgB, true)) // true, true
    case m: MessageC if !seenMsgB =>
      stash()
      context.become(receiveBlock(seenMsgB, true)) // false, true

    case ...
  }

  def receive = receiveBlock(false, false)
}

Upvotes: 1

Related Questions