Reputation: 4502
Suppose I have multiple tasks that I want to run in parallel.
Each task(method) has an inner recursive function that basically fetches data from a database and saves it in some data storage.
[simplified inner recursive function]
def simplifiedSomeTask(): Unit = {
@scala.annotation.tailrec
def get(
stream: Stream[SomeEntity],
result: Seq[SomeEntity],
): Stream[SomeEntity] = result match {
case Nil =>
stream
case _ =>
val query = //query to fetch data from database
get(
stream append result.toStream,
query.run.value, // get fetched data from database
)
}
val buffer = collection.mutable.Map.empty[String, String]
get(
Stream.empty,
query.run.value
).foreach { r =>
buffer.put(r.loginId, r.userId)
}
}
When trying to run A, Future never finishes for some reason.
[A]
val f1 =Future { someTask1() }
val f2 =Future { someTask2() }
val f3 =Future { someTask3() }
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
Await.result(f, Duration.Inf)
However, B works(though it does not run in parallel)
[B]
val f = for {
_ <- Future { someTask1() }
_ <- Future { someTask2() }
_ <- Future { someTask3() }
} yield ()
Await.result(f, Duration.Inf)
How should I modify A so it runs as expected?
Upvotes: 2
Views: 164
Reputation: 4502
It turned out that some circler references while creating the query
objects were causing this problem.
Upvotes: 0
Reputation: 452
The problem does not lie within the for-comprehension, but with your tasks. Potentially there's some kind of deadlock from running them in parallel, but I'd triple check first that they don't end up in an infinite loop. Looking at your example, that could easily happen if query.run.value
never returns empty, and then the recursion will continue forever. If any of f1
, f2
and f3
don't resolve, then f
will of course never resolve either.
Upvotes: 0
Reputation: 22595
I couldn't reproduce your issue, but the reason for weird behaviour might be that your syntax in the first example is not exactly correct. You should write your first for-comprehension like:
val f = for {
_ <- f1
_ <- f2
_ <- f3
} yield ()
But for-comprehension works sequentially and the only reason your futures run in parallel in your first example is that Futures start eagerly ("Future starts now").
If you want to make sure Futures will execute in parallel use Future.sequence
:
val f = Future.sequence(
List(
Future { someTask1() },
Future { someTask2() },
Future { someTask3() }
)
)
Upvotes: 3