Reputation: 1131
I have an use case where after n flows of Akka-stream, I have to take the result of one of them and made a request to a HTTP REST API.
The last akka-stream flow type, before the HTTP request is a string:
val stream1:Flow[T,String,NotUsed] = Flow[T].map(_.toString)
Now, HTTP request should be specified, I thought about something like:
val stream2: Flow[String,Future[HttpRespone],NotUsed] = Flow[String].map(param => Http.singleRequest(HttpRequest(uri=s"host.com/$param")))
and then combine it:
val stream3 = stream1 via stream2
Is it the best way to do it? Which ways you guys would actually recommend and why? A couple of best praxis examples in the scope of this use case would be great!
Thanks in advance :)
Upvotes: 3
Views: 1657
Reputation: 17973
Your implementation would create a new connection to "host.com"
for each new param. This is unnecessary and prevents akka from making certain optimizations. Under the hood akka actually keeps a connection pool around to reuse open connections but I think it is better to specify your intentions in the code and not rely on the underlying implementation.
You can make a single connection as described in the documentation:
val connectionFlow: Flow[HttpRequest, HttpResponse, _] =
Http().outgoingConnection("host.com")
To utilize this connection Flow you'll need to convert your String paths to HttpRequest
objects:
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
def pathToRequest(path : String) = HttpRequest(uri=Uri.Empty.withPath(Path(path)))
val reqFlow = Flow[String] map pathToRequest
And, finally, glue all the flows together:
val stream3 = stream1 via reqFlow via connectionFlow
This is the most common pattern for continuously querying the same server with different request objects.
Upvotes: 3