Reputation: 9103
In my actor I have two requests at different addresses like this:
http.singleRequest(HttpRequest(uri = encodedUri).addHeader(Accept(MediaTypes.`application/json`)))
I need that these two requests return both a value. As normal futures I would expect something like this:
val response: Future[SomeData] = for {
r1 <- firstRequest
r2 <- secondRequest
} yield {
// merge the results of these two responses
}
response onComplete {
case Success(body) => sndr ! Something(body)
case Failure(message) => BadRequest(message.toString)
}
In this part of the documentation:
http://doc.akka.io/docs/akka/2.4/scala/http/client-side/request-level.html
It is suggested to use pipeTo
to self
to manage the single request, instead of using native onComplete/map/etc
.
How can I apply that for multiple requests like in my case, where I need to wait for 2 or more to be completed?
Upvotes: 1
Views: 312
Reputation: 14825
Simple and straight foward
val f1 = Future { //request1 }
val f2 = Future { //request2 }
val resultF = f1 zip f2
resultF pipeTo self
Current actor will get the result as a message and message will be a tuple (f1Result, f2Result)
if result resultF
fails current actor gets failure message wrapped in akka.actor.Status.Failure
In the method f1
and f2
are independent futures
In case f2
depends on the value of f1
use flatMap
val resultF = f1.flatMap { f1Result => createF2(f1Result) }
//alternatively we can use for comprehension
resultF pipeTo self
Example
import akka.actor.Actor
import akka.actor.Status.Failure
import scala.concurrent.Future
import akka.pattern.pipe
object ManagerActor {
case object Exec
}
class ManagerActor extends Actor {
import ManagerActor._
implicit val dispather = context.dispatcher
override def receive: Receive = {
case Exec =>
val f1 = Future { 1 }
val f2 = Future { 2 }
val resultF = f1 zip f2
resultF pipeTo self
case result: (Int, Int) => //be careful about type erasure
println(s"""result:$result""")
case Failure(th) =>
println(s"""msg: ${th.getMessage}""")
}
}
to run
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem()
val actor = system.actorOf(Props[ManagerActor])
actor ! ManagerActor.Exec
Thread.sleep(1000000)
}
}
We can use Future.sequence
instead to zip
to generalise this to any number of http requests.
Upvotes: 3