Reputation: 555
I would like to utilize a simple Flow to gather some extra data from a http service and enhance my data object with the results. The following illustrates the Idea:
val httpClient = Http().superPool[User]()
val cityRequest = Flow[User].map { user=>
(HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), user) => user
case (Success(resp), user) => {
// << What to do here to get the value >> //
val responseData = processResponseSomehowToGetAValue?
val enhancedUser = new EnhancedUser(user.data, responseData)
enhancedUser
}
}
val processEnhancedUser = Flow[EnhancedUser].map {
// e.g.: Asynchronously save user to a database
}
val useEnhancementGraph = userSource
.via(getRequest)
.via(httpClient)
.via(getResponse)
.via(processEnhancedUser)
.to(Sink.foreach(println))
I have a problem to understand the mechanics and difference between the streaming nature and materialization / Futures inside the Flow.
Following ideas did not explain it to me:
How do i get the value from the response into the new user object, so i can handle that object in the following steps.
Thanks for help.
Update:
I was evaluating the code with a remote akka http server answering to requests between immediately and 10 seconds using the code below for parsing. This led to the effect that some "EnhancedUser" Instances showed up at the end, but the ones who took too long to answer were missing their values.
I added .async to the end of the cityResponse parser at some time and the result output took longer, but was correct.
What is the reason for that behaviour and how does it fit together with the accepted Answer?
val cityResponse = Flow[(Try[HttpResponse], User)].map {
case (Failure(ex), member) => member
case (Success(response), member) => {
Unmarshal(response.entity).to[String] onComplete {
case Success(s) => member.city = Some(s)
case Failure(ex) => member.city = None
}
}
member
}.async // <<-- This changed the behavior to be correct, why?
Upvotes: 4
Views: 1368
Reputation: 17933
There are two different strategies you could use depending on the nature of the entity you are getting from "cityRequestEndpoint":
Stream Based
The typical way to handle this situation is to always assume that the entity coming from the source endpoint can contain N pieces of data, where N is not known in advance. This is usually the pattern to follow because it is the most generic and therefore "safest" in the real world.
The first step is to convert the HttpResponse
coming from the endpoint into a Source of data:
val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] =
(response, user) => response match {
case Failure(_) => Source single (None -> user)
case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
}
The above code is where we don't assume the size of N, r.entity.dataBytes
could be a Source of 0 ByteString
values, or potentially an infinite number values. But our logic doesn't care!
Now we need to combine the data coming from the Source. This is a good use case for Flow.flatMapConcat which takes a Flow of Sources and converts it into a Flow of values (similar to flatMap for Iterables):
val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] =
Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
All that is left to do is convert the tuples of (ByteString, User)
into EnhancedUser
. Note: I am assuming below that User
is a subclass of EnhancedUser
which is inferred from the question logic:
val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser =
(byteStr, user) =>
byteStr
.map(s => EnhancedUser(user.data, s))
.getOrElse(user)
val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] =
Flow[(ByteString, User)] map convertByteStringToUser
These components can now be combined:
val useEnhancementGraph =
userSource
.via(cityRequest)
.via(httpClient)
.via(cityByteStrFlow)
.via(cityUserFlow)
.via(processEnhancedUser)
.to(Sink foreach println)
Future Based
We can use Futures to solve the problem, similar to the stack question you referenced in your original question. I don't recommend this approach for 2 reasons:
EnhancedUser
.Async.await
(which should almost always be avoided).To use the Future based approach the only big change to your original code is to use Flow.mapAsync
instead of Flow.map
to handle the fact that a Future
is being created in the function:
val parallelism = 10
val timeout : FiniteDuration = ??? //you need to specify the timeout limit
val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] =
_ match {
case (Failure(ex), user) =>
Future successful user
case (Success(resp), user) =>
resp
.entity
.toStrict(timeout)
.map(byteStr => new EnhancedUser(user.data, byteStr))
}
val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)
Upvotes: 5