Tom
Tom

Reputation: 45104

Transforming a future in Akka streams

I'm trying to use Akka Streams to concurrently send requests to a server, and then trying to relate each request to an original context (an Int in this example). This is the flow I've put together:

val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled)
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled)
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled)

val handler =  createRequestFlow via sendRequestFlow via handleResponseFlow

Particularly, I'm trying to find the way to return a Future[(HttpResponse, Int)]. Currently, I'm doing this

def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = {
    Http().singleRequest(request).map(r => (r,ctx))
  }

but I understand that the fact that this requires an Executor indicates that there is another (better) way to do it.

Upvotes: 0

Views: 401

Answers (1)

SergGr
SergGr

Reputation: 23788

I don't think there is a better way. Akka uses standard Scala Futures and they by design require an ExecutionContext to perform almost any operation. If you really-really don't want to use another thread for this simple map, you can create your own sameThreadExecutionContext similar to the one Akka uses inside (see akka.dispatch.ExecutionContexts.sameThreadExecutionContext) so the map will be performed on the same thread that handles main Http response, but don't use it for anything more complicated (see also discussion at GitHub #19043).

Upvotes: 1

Related Questions