Reputation: 45104
I'm trying to use Akka Streams to concurrently send requests to a server, and then trying to relate each request to an original context (an Int
in this example). This is the flow I've put together:
val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled)
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled)
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled)
val handler = createRequestFlow via sendRequestFlow via handleResponseFlow
Particularly, I'm trying to find the way to return a Future[(HttpResponse, Int)]
. Currently, I'm doing this
def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = {
Http().singleRequest(request).map(r => (r,ctx))
}
but I understand that the fact that this requires an Executor indicates that there is another (better) way to do it.
Upvotes: 0
Views: 401
Reputation: 23788
I don't think there is a better way. Akka uses standard Scala Future
s and they by design require an ExecutionContext
to perform almost any operation. If you really-really don't want to use another thread for this simple map
, you can create your own sameThreadExecutionContext
similar to the one Akka uses inside (see akka.dispatch.ExecutionContexts.sameThreadExecutionContext) so the map
will be performed on the same thread that handles main Http response, but don't use it for anything more complicated (see also discussion at GitHub #19043).
Upvotes: 1