Randomize
Randomize

Reputation: 9103

Akka: how to manage a double http request?

In my actor I have two requests at different addresses like this:

http.singleRequest(HttpRequest(uri = encodedUri).addHeader(Accept(MediaTypes.`application/json`)))

I need that these two requests return both a value. As normal futures I would expect something like this:

val response: Future[SomeData] = for {
        r1 <- firstRequest
        r2 <- secondRequest
      } yield {
        // merge the results of these two responses
      }

      response onComplete {
        case Success(body) => sndr ! Something(body)
        case Failure(message) => BadRequest(message.toString)
      }

In this part of the documentation:

http://doc.akka.io/docs/akka/2.4/scala/http/client-side/request-level.html

It is suggested to use pipeTo to self to manage the single request, instead of using native onComplete/map/etc.

How can I apply that for multiple requests like in my case, where I need to wait for 2 or more to be completed?

Upvotes: 1

Views: 312

Answers (1)

Nagarjuna Pamu
Nagarjuna Pamu

Reputation: 14825

Simple and straight foward

val f1 = Future { //request1 }

val f2 = Future { //request2 }

val resultF = f1 zip f2 

resultF pipeTo self

Current actor will get the result as a message and message will be a tuple (f1Result, f2Result)

if result resultF fails current actor gets failure message wrapped in akka.actor.Status.Failure

In the method f1 and f2 are independent futures

In case f2 depends on the value of f1 use flatMap

 val resultF = f1.flatMap { f1Result => createF2(f1Result) }

 //alternatively we can use for comprehension

 resultF pipeTo self

Example

import akka.actor.Actor
import akka.actor.Status.Failure

import scala.concurrent.Future
import akka.pattern.pipe

object ManagerActor {
  case object Exec
}

class ManagerActor extends Actor {

  import ManagerActor._

  implicit val dispather = context.dispatcher

 override def receive: Receive = {
    case Exec =>

    val f1 = Future { 1 }

    val f2 = Future { 2 }

    val resultF = f1 zip f2

    resultF pipeTo self

   case result: (Int, Int) => //be careful about type erasure 
     println(s"""result:$result""")

   case Failure(th) =>
     println(s"""msg: ${th.getMessage}""")

   }
}

to run

object Main {
 def main(args: Array[String]): Unit = {
   val system = ActorSystem()
   val actor = system.actorOf(Props[ManagerActor])
   actor ! ManagerActor.Exec
   Thread.sleep(1000000)
 }
}

We can use Future.sequence instead to zip to generalise this to any number of http requests.

Upvotes: 3

Related Questions