Eumcoz
Eumcoz

Reputation: 2458

Processing unknown amount of actors results on single timeout

I am looking to expand the following code to work for an unknown amount of actor ask requests.

implicit val timeout = Timeout(100 millis)
val sendRequestActor = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber))
val sendRequestActor2 = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber))
val a1 = ask(sendRequestActor, Request).fallbackTo(Future.successful(RequestTimeout))
val a2 = ask(sendRequestActor2, Request).fallbackTo(Future.successful(RequestTimeout))
val result = for {
  r1 <- a1
  r2 <- a2
} yield(r1, r2)

val r = Await.result(result, 100 millis)
r match {
  case (b: SuccessResponse, b2: SuccessResponse) => {
    //Process Results
  }
  case (b: SuccessResponse, b2: RequestTimeout) => {
    //Process Results
  }
  case (b: RequestTimeout, b2: SuccessResponse) => {
    //Process Results
  }
  case (b: RequestTimeout, b2: RequestTimeout) => {
    //Process Results
  }
  case _ => {}
}

I am trying to send out requests to a List of recipients(gotten from a previous database call). The number of recipients will vary each time this function is called. Recipients have a maximum of 100 milliseconds to respond before I want to time out their requests and record a RequestTimeout. The SendRequest actor will reply with SuccessResponse if the recipients respond. I am assuming I will have to change the val result for-loop to process a list, but I am unsure of how to structure everything so that I will wait the minimum amount of time(either when all actors return or when the timeout hits, whichever is lower). I do not need everything in a single return value like the example, I am fine with a list of results and matching type on each iteration.

Any help would be appreciated, please let me know if I can provide any other information.

Thank you

Edit:

Calling Class:

case object GetResponses

def main(args: Array[String]) {

val route = {
  get {
    complete {
      //stuff
      val req_list = List(req1,req2,req3)
      val createRequestActor = system.actorOf(Props(new SendAll(req_list)), "Get_Response_Actor_" + getActorNumber)
      val request_future = ask(createRequestActor, GetResponses).mapTo[List[Any]]
      Thread.sleep(1000)
      println(request_future)
      //more stuff
    }
  }
}


Http().bindAndHandle(route, "localhost", 8080)
}

Updated Sending Class:

class SendAll(requests: List[Request]) extends Actor {
  import context.{become,dispatcher}
  var numProcessed = 0
  var results: List[Any] = List()
  requests.foreach(self ! _)

  implicit val timeout = Timeout(100 millis)
  def receive = {

    case r: RequestMsg =>
      val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request_".concat(getActorNumber))
      (sendRequestActor ? Request).pipeTo(self)

    case s: SuccessResponse =>
      println("Got Success")
      results = results :+ s
      println(results.size + " == " + requests.size)
      if(results.size == requests.size) {
        println("Before done")
        become(done)
      }

    case akka.actor.Status.Failure(f) =>
      println("Got Failed")
      results = results :+ RequestTimeout
      if(results.size == requests.size) {
        become(done)
      }

    case m => 
      println("Got Other")

  }

  def done: Receive = {
    case GetResponses =>
      println("Done")
      sender ! results
    case _ => {
      println("Done as well")
    }
  }
}

Output

Got Success
1 == 3
Got Success
2 == 3
Got Success
3 == 3
Before done
Future(<not completed>)

Upvotes: 0

Views: 69

Answers (2)

Alexandr Dorokhin
Alexandr Dorokhin

Reputation: 850

The simplest solution is put all your actor refs into the List map it to List[Future] and use Future.sequence to obtain Future[List].

val route = {
  get {
    val listActorRefs = List(actorRef1, actorRef2, ...)
    val futureListResponses = Future.sequence(listActorRefs.map(_ ? Request))
    onComplete(futureListResponses) {
      case Success(listResponse) => ...
        complete(...)
      case Failure(exception) => ...
    }
  }
}

A better solution is avoid a lot of actor' asks, prepare some ResponseCollector actor which will send all your message (I suggest to look at BroadcastPool) and schedule one message for itself to stop waiting and return result.

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

I would pass the list of requests to the actor, then pipe the responses from the child actors to self instead of using Await.result. For example:

class Handler(requests: List[RequestMsg]) extends Actor {
  import context.{become, dispatcher}
  var numProcessed = 0
  var results: List[Any] = List()
  requests.foreach(self ! _)

  implicit val timeout = Timeout(100.millis)

  def receive = {
    case r: RequestMsg =>
      val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request".concat(getActorNumber))
      (sendRequestActor ? Request).pipeTo(self)

    case s: SuccessResponse =>
      println(s"response: $s")
      results = results :+ s
      if (results.size == requests.size)
        become(done)

    case akka.actor.Status.Failure(f) =>
      println("a request failed or timed out")
      results = results :+ RequestTimeout
      if (results.size == requests.size)
        become(done)

    case m =>
      println(s"Unhandled message received while processing requests: $m")
      sender ! NotDone
  }

  def done: Receive = {
    case GetResponses =>
      println("sending responses")
      sender ! results
  }
}

You would instantiate an actor for every list of requests:

val requests1 = List(RequestMsg("one"), RequestMsg("two"), RequestMsg("three"))
val handler1 = system.actorOf(Props(new Handler(requests1)))

In this example--following the principle that an actor should have a distinct, limited sphere of responsibility--the actor simply coordinates requests and responses; it doesn't perform any processing on the collected responses. The idea is that another actor would send this actor a GetResponses messages in order to get the responses and process them (or this actor would proactively send the results to a processing actor).

Upvotes: 1

Related Questions