Reputation: 11
I want to process this nested futures sequentially for each element that I pass with an'iterator.
I chained the futures with flatMap and Map ,is this the correct way?
What should I do to perform the process in the desired way (shown below) with and without the use of any blocking tool?
object Main{
Iterator.foreach{e=>
process(e)
}
}
object A {
def doOne(e): Future[Any] = Future {
println("startFirst"); Thread.sleep(3000);
}
def doTwo(Any): Future[Any] = Future {
println("startSecond"); Thread.sleep(1000);
}
def doThree(Any): Future[Any] = Future {
println("do 3"); Thread.sleep(1000);
}
def doFour(e,Any): Future[Unit] = Future {
println(s"do 4&processComplete$e"); Thread.sleep(1000);
}
def process(e):Future[Unit]={
val a= doOne(e)
.flatMAp{a=> doTwo(a)}
.flatMap{b=>doThree(b)}
.map{c=> doFour(c)}
}
If I pass 3 elemnt (e1,e2,e3) to def process I expect the program to print:
startFirst (e1)
startSecond(e1)
startThree (e1)
startFour&processComplete (e1)
startFirst (e2)
startSecond(e2)
startThree (e2)
startFour&processComplete (e2)
startFirst (e3)
startSecond(e3)
startThree (e3)
startFour&processComplete (e3)
Intead of :
startFirst (e1)
startFirst (e2)
startFirst (e3)
startSecond(e1)
startSecond(e2)
startSecond(e3)
startThree (e1)
startThree (e2)
startThree (e3)
startFour&processComplete (e1)
startFour&processComplete (e2)
startFour&processComplete (e3)
Upvotes: 1
Views: 81
Reputation: 22895
You can do something like this:
def sequentialTraverse_[A](col: IterableOnce[A])(f: A => Future[Any])(implicit ec: ExecutionContext): Future[Unit] =
col.iterator.foldLeft(Future.successful(())) {
case (accF, a) =>
accF.flatMap(_ => f(a)).map(_ => ())
}
Which you may also convert into an extension method so you can do something like:
List("A", "B", "C").sequentialTraverse_(process)
You can see it working here.
Upvotes: 5