λ Allquantor λ
λ Allquantor λ

Reputation: 1131

What is the best way to combine akka-http flow in a scala-stream flow

I have an use case where after n flows of Akka-stream, I have to take the result of one of them and made a request to a HTTP REST API.

The last akka-stream flow type, before the HTTP request is a string:

val stream1:Flow[T,String,NotUsed] = Flow[T].map(_.toString)

Now, HTTP request should be specified, I thought about something like:

val stream2: Flow[String,Future[HttpRespone],NotUsed] = Flow[String].map(param => Http.singleRequest(HttpRequest(uri=s"host.com/$param")))

and then combine it:

val stream3 = stream1 via stream2

Is it the best way to do it? Which ways you guys would actually recommend and why? A couple of best praxis examples in the scope of this use case would be great!

Thanks in advance :)

Upvotes: 3

Views: 1657

Answers (1)

Your implementation would create a new connection to "host.com" for each new param. This is unnecessary and prevents akka from making certain optimizations. Under the hood akka actually keeps a connection pool around to reuse open connections but I think it is better to specify your intentions in the code and not rely on the underlying implementation.

You can make a single connection as described in the documentation:

val connectionFlow: Flow[HttpRequest, HttpResponse, _] = 
  Http().outgoingConnection("host.com")

To utilize this connection Flow you'll need to convert your String paths to HttpRequest objects:

import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path

def pathToRequest(path : String) = HttpRequest(uri=Uri.Empty.withPath(Path(path)))

val reqFlow = Flow[String] map pathToRequest

And, finally, glue all the flows together:

val stream3 = stream1 via reqFlow via connectionFlow

This is the most common pattern for continuously querying the same server with different request objects.

Upvotes: 3

Related Questions