Reputation: 303
I am trying to do asynchronous http calls with akka streams.
This is what I tried.
Source(listEndpoints)
.mapAsync(20)(endpoint => Future(Await.result(request(HttpMethods.POST, endpoint, List(authHeader)), timeout)))
.runWith(Sink.seq[HttpResponse])
I am using akka-http
within the request method and it returns Future[HttpResponse]
I think I am abusing Future
here. The code above would give me a Future[List[HttpResponse]]
and I have to use Await
again to get a List[HttpResponse]
. Is there a more elegant way to timeout functions within mapAsync
?
Upvotes: 0
Views: 228
Reputation: 20611
Assuming your request
method at some point does
Http().singleRequest
to get a Future[HttpResponse]
, you can pass a timeout for the request through:
// inside def request(...), will probably need to add a timeout argument here
val request = ??? // Build the HttpRequest
Http().singleRequest(
request = request,
settings = ConnectionPoolSettings.default.withMaxConnectionLifetime(timeout)
Then your stream would just be
Source(listEndpoints)
.mapAsync(request(...))
.runWith(Sink.seq[HttpResponse])
and you'd only need to Await
at the "end of the world" for the Future[List[HttpResponse]]
to complete.
You can also change the default max connection lifetime with akka.http.host-connection-pool.max-connection-lifetime
in application.conf
Upvotes: 2