KAY_YAK
KAY_YAK

Reputation: 303

Is there a way to timeout functions inside mapAsync for akka?

I am trying to do asynchronous http calls with akka streams.

This is what I tried.

Source(listEndpoints)
      .mapAsync(20)(endpoint => Future(Await.result(request(HttpMethods.POST, endpoint, List(authHeader)), timeout)))
      .runWith(Sink.seq[HttpResponse])

I am using akka-http within the request method and it returns Future[HttpResponse]

I think I am abusing Future here. The code above would give me a Future[List[HttpResponse]] and I have to use Await again to get a List[HttpResponse]. Is there a more elegant way to timeout functions within mapAsync?

Upvotes: 0

Views: 228

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20611

Assuming your request method at some point does

Http().singleRequest

to get a Future[HttpResponse], you can pass a timeout for the request through:

// inside def request(...), will probably need to add a timeout argument here
val request = ???  // Build the HttpRequest
Http().singleRequest(
  request = request,
  settings = ConnectionPoolSettings.default.withMaxConnectionLifetime(timeout)

Then your stream would just be

Source(listEndpoints)
  .mapAsync(request(...))
  .runWith(Sink.seq[HttpResponse])

and you'd only need to Await at the "end of the world" for the Future[List[HttpResponse]] to complete.

You can also change the default max connection lifetime with akka.http.host-connection-pool.max-connection-lifetime in application.conf

Upvotes: 2

Related Questions