Reputation: 13547
val pageDataFutures : Seq[Future[PageData]]= ??? //4 api calls each resulting in a future of PageData
def source : Source[PageData, NotUsed] = Source(
pageDataFutures.flatMap(future => Await.result(future,atMost)).toList
)
source.runForeach(println)
I expected the source to run 'runForEach'
as and when each future is completed. But instead all the 4 api calls are made and then data in the source is all printed at once. Shouldn't it be printing the data as and available? I am using Await
for each future. So it is guaranteed that before the next future is Awaited
upon, the result of the previous future is available and can be used by println
Upvotes: 0
Views: 57
Reputation: 19527
Use mapAsync
:
Pass incoming elements to a function that return a
Future
result. When theFuture
arrives the result is passed downstream. Up ton
elements can be processed concurrently, but regardless of their completion time the incoming order will be kept when results complete.
val pageDataFutures: Seq[Future[PageData]] = ???
Source(pageDataFutures)
.mapAsync(parallelism = 1)(x => x) // or: mapAsync(parallelism = 1)(identity)
.runForeach(println)
Upvotes: 1