Reputation: 17104
I launch several async processes which, in turn, can launch more processes if it's needed (think traversing directory structure or something like that). Each process returns something, and in the end I want to wait for completion of all of them and schedule a function that will do something with resulting collection.
My solution attempt used a mutable ListBuffer
(to which I keep adding futures that I spawn), and Future.sequence
to schedule some function to run on completion of all these futures listed in this buffer.
I've prepared a minimal example that illustrates the issue:
object FuturesTest extends App {
var queue = ListBuffer[Future[Int]]()
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec; queue = $queue")
3
}
queue += f3
Console.println(s"f1: 1 sec; queue = $queue")
1
}
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec; queue = $queue")
2
}
queue += f1
queue += f2
Console.println(s"starting; queue = $queue")
Future.sequence(queue).foreach(
(all) => Console.println(s"Future.sequence finished with $all")
)
Thread.sleep(5000) // simulates app being alive later
}
It schedules f1
and f2
futures first, and then f3
will be scheduled in f1
resolution 1 second later. f3
itself will resolve in 2 more seconds. Thus, what I expect to get is the following:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)
However, I actually get:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
... which is most likely due to the fact that a list of futures that we wait for is fixed during the initial call of Future.sequence
and won't change later.
Ultimately, I've made it act as I wanted with this code:
waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))
def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
val seq = Future.sequence(queue)
seq.onComplete {
case Success(res) =>
if (res.size < queue.size) {
Console.println("... still waiting for tasks")
waitForSequence(queue, act)
} else {
act(res)
}
case Failure(exc) =>
throw exc
}
}
This works as intended, getting all 3 futures in the end:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)
But it's still very ugly. It just restarts Future.sequence
waiting if it sees that at time of completion the queue is longer than number of results, hoping that when it completes next time, situation will be better. Of course, this is bad because it exhausts stack and it might be error-prone if this check will trigger in a tiny window between creation of a future and appending it to the queue.
Is it possible to do so without rewriting everything with Akka, or resorting to use Await.result
(which I can't actually use due to my code being compiled for Scala.js).
Upvotes: 2
Views: 1070
Reputation: 1422
I would not involve Future.sequence
: it parallelizes the operations, and you seem to be looking for a sequential async execution. Also, you probably don't need the futures to start right away after defining. The composition should looks something like this:
def run[T](queue: List[() => Future[T]]): Future[List[T]] = {
(Future.successful(List.empty[T]) /: queue)(case (f1, f2) =>
f1() flatMap (h => )
)
val t0 = now
def f(n: Int): () => Future[String] = () => {
println(s"starting $n")
Future[String] {
Thread.sleep(100*n)
s"<<$n/${now - t0}>>"
}
}
println(Await.result(run(f(7)::f(10)::f(20)::f(3)::Nil), 20 seconds))
The trick is not to launch the futures prematurely; that's why we have f(n)
that won't start until we call it with ()
.
Upvotes: 1
Reputation: 7591
Like Justin mentioned, you can't lose the reference to the futures spawned inside of the other futures and you should use map and flatMap to chain them.
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec")
3
}
f3.map{
r =>
Console.println(s"f1: 1 sec;")
Seq(1, r)
}
}.flatMap(identity)
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec;")
Seq(2)
}
val futures = Seq(f1, f2)
Future.sequence(futures).foreach(
(all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)
Thread.sleep(5000) // simulates app being alive later
This works on the minimal example, I am not sure if it will work for your real use case. The result is:
f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)
Upvotes: 1
Reputation: 2659
The right way to do this is probably to compose your Futures. Specifically, f1 shouldn't just kick off f3, it should probably flatMap over it -- that is, the Future of f1 doesn't resolve until f3 resolves.
Keep in mind, Future.sequence
is kind of a fallback option, to use only when the Futures are all really disconnected. In a case like you're describing, where there are real dependencies, those are best represented in the Futures you've actually returning. When using Futures, flatMap is your friend, and should be one of the first tools you reach for. (Often but not always as for
comprehensions.)
It's probably safe to say that, if you ever want a mutable queue of Futures, the code isn't structured correctly and there's a better way to do it. Specifically in Scala.js (which is where much of my code lies, and which is very Future-heavy), I use for comprehensions over those Futures constantly -- I think it's the only sane way to operate...
Upvotes: 1