sbeliakov
sbeliakov

Reputation: 2189

Is it possible to await for second response in Scala

Let's say I have actor A. A expects to receive message and once it receives a message it sends two messages back.

A extends Actor {
  def receive: Receive = {
    case M1 =>
      context.sender ! M2
      context.sender ! M3
  }
}

And in actor A I want to send a message and then await for two responses. I know that's easy for one response in a way like

val task = A ? M1
Await.result(task, timeout)

but I'm not sure whether it is possible with two sequential messages.

It is important to send two separate messages as I need to Await only first of them in another place.

Upvotes: 1

Views: 243

Answers (2)

Akos Krivachy
Akos Krivachy

Reputation: 4966

You can solve this problem by introducing an intermediate Actor in the cases when you do need to wait for both messages.

This actor would look something like this:

class AggregationActor(aActor: ActorRef) extends Actor {

  var awaitForM2: Option[M2] = None
  var awaitForM3: Option[M3] = None
  var originalSender: Option[ActorRef] = None

  def receive: Receive = {
    case M1 =>
      // We save the sender
      originalSender = Some(sender())
      // Proxy the message
      aActor ! M1
    case M2 =>
      awaitForM2 = Some(M2)
      checkIfBothMessagesHaveArrived()
    case M3 =>
      awaitForM3 = Some(M3)
      checkIfBothMessagesHaveArrived()
  }

  private def checkIfBothMessagesHaveArrived() = {
    for {
      m2 <- awaitForM2
      m3 <- awaitForM3
      s <- originalSender
    } {
      // Send as a tuple
      s ! (m2, m3)
      // Shutdown, our task is done
      context.stop(self)
    }
  }

}

Essentially it has internal state and keeps track of how M1 and M2 responses are arriving.

You could use this like:

def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = {
  val aggregationActor = system.actorOf(Props(new AggregationActor(aActor)))
  (aggregationActor ? input).mapTo[(M2, M3)]
}

val system = ActorSystem("test")
val aActor = system.actorOf(Props(new A), name = "aActor")

// Awaiting the first message only:
val firstMessage = aActor ? M1
val first = Await.result(firstMessage, Duration.Inf)

// Awaiting both messages:
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system)
val both = Await.result(firstMessage, Duration.Inf)

Upvotes: 1

Thiago Pereira
Thiago Pereira

Reputation: 1712

How about return to the sender a tuple containing M2 and M3?

import akka.pattern.ask
import akka.actor.{Props, ActorSystem, Actor}
import akka.util.Timeout
import com.test.A.{M1, M2, M3}

import scala.concurrent.Await
import scala.concurrent.duration._

object Test extends App {

  implicit val timeout = Timeout(5 seconds)

  val system = ActorSystem("test-system")
  val actor = system.actorOf(Props[A], name = "a-actor")
  val future = actor ? M1
  val await = Await.result(future, Duration.Inf)
  println(await)

}

class A extends Actor {
  override def receive: Receive = {
    case M1 => sender() ! (M2, M3)
  }
}

object A {
  case object M1
  case object M2
  case object M3
}

Running this will result in:

(M2,M3)

Upvotes: 1

Related Questions