Pete
Pete

Reputation: 17122

Asynchronously collect and compose responses from list of Akka Actors

I have an Akka Actor called Gate which answers a Status message with a response message of Open or Closed:

"A stateless gate" must {
    "be open" in {
      val parent = TestProbe()
      val gate = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      gate ! 7
      gate ! Gate.Status
      parent.expectMsg(Gate.Open)
    }

What I would like to do is construct a logical AND gate which queries a List of gates, returning Open if they are all open:

"A logical AND gate" must {
    "be open when all children are open" in {
      val parent = TestProbe()
      val parent2 = TestProbe()
      val gate_1 = parent.childActorOf(
        TestStatelessGate.props(7)
      )
      val gate_2 = parent.childActorOf(
        TestStatelessGate.props(5)
      )
      val gate_list = List(gate_1, gate_2)
      val and_gate = parent2.childActorOf(
        LogicalAndGate.props(gate_list)
      )
      gate_1 ! 7
      gate_2 ! 5
      and_gate ! Gate.Status
      parent2.expectMsg(Gate.Open)

The Scala documentation has a nice bit about using a for expression and pipe here. The relevant part of that documentation is:

final case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] // call pattern directly
    s <- actorB.ask(Request).mapTo[String] // call by implicit conversion
    d <- (actorC ? Request).mapTo[Double] // call by symbolic name
  } yield Result(x, s, d)

f.pipeTo(actorD

I'm getting hung up trying to do something like this with a List of ActorRefs (gate_list in the code below):

override def receive: Receive = {
    case Status => {
      val futures: Seq[Future[Any]] =
        for (g <- gate_list)
          yield ask(g, Status)
      val all_open: Future[Boolean] = Future {
        !futures.contains(Closed)
        }
      pipe(all_open) to parent
    }
  }

Of course that doesn't work because futures.contains(Closed) is comparing two different types of things, a Future[Any] and my case object.

Upvotes: 0

Views: 32

Answers (1)

Tim
Tim

Reputation: 27356

I am assuming that Open and Closed are case object values that inherit from some common trait OpenClosed.

Firstly you need to use mapTo to convert the ask result to OpenClosed. I would also use map rather than for:

val futures: Seq[Future[OpenClosed]] =
  gate_list.map(g => ask(g, Status).mapTo[OpenClosed])

Then you need Future.sequence to wait for all these to complete:

Future.sequence(futures).onComplete {
  case Success(res) =>
    parent ! res.forall(_ == Open)
  case Failure(_) =>
    parent ! Closed
}

Upvotes: 1

Related Questions