d-_-b
d-_-b

Reputation: 4502

How to prevent Future from never finishing

Suppose I have multiple tasks that I want to run in parallel.

Each task(method) has an inner recursive function that basically fetches data from a database and saves it in some data storage.

[simplified inner recursive function]

 def simplifiedSomeTask(): Unit = {
    @scala.annotation.tailrec
    def get(
        stream: Stream[SomeEntity],
        result: Seq[SomeEntity],
    ): Stream[SomeEntity] = result match {
      case Nil =>
        stream
      case _ =>
        val query = //query to fetch data from database
        get(
          stream append result.toStream,
          query.run.value, // get fetched data from database
        )
    }

    val buffer = collection.mutable.Map.empty[String, String]

    get(
      Stream.empty,
      query.run.value
    ).foreach { r =>
      buffer.put(r.loginId, r.userId)
    }
  }

When trying to run A, Future never finishes for some reason.

[A]

val f1 =Future { someTask1() }
val f2 =Future { someTask2() }
val f3 =Future { someTask3() }

val f = for {
  _ <- f1 
  _ <- f2 
  _ <- f3 
} yield ()

Await.result(f, Duration.Inf)

However, B works(though it does not run in parallel)

[B]

val f = for {
  _ <- Future { someTask1() }
  _ <- Future { someTask2() }
  _ <- Future { someTask3() }
} yield ()

Await.result(f, Duration.Inf)

How should I modify A so it runs as expected?

Upvotes: 2

Views: 164

Answers (3)

d-_-b
d-_-b

Reputation: 4502

It turned out that some circler references while creating the query objects were causing this problem.

Upvotes: 0

Henrik
Henrik

Reputation: 452

The problem does not lie within the for-comprehension, but with your tasks. Potentially there's some kind of deadlock from running them in parallel, but I'd triple check first that they don't end up in an infinite loop. Looking at your example, that could easily happen if query.run.value never returns empty, and then the recursion will continue forever. If any of f1, f2 and f3 don't resolve, then f will of course never resolve either.

Upvotes: 0

Krzysztof Atłasik
Krzysztof Atłasik

Reputation: 22595

I couldn't reproduce your issue, but the reason for weird behaviour might be that your syntax in the first example is not exactly correct. You should write your first for-comprehension like:

val f = for {
  _ <- f1
  _ <- f2
  _ <- f3
} yield ()

But for-comprehension works sequentially and the only reason your futures run in parallel in your first example is that Futures start eagerly ("Future starts now").

If you want to make sure Futures will execute in parallel use Future.sequence:

val f = Future.sequence(
  List(
    Future { someTask1() },
    Future { someTask2() },
    Future { someTask3() }
  )
)

Upvotes: 3

Related Questions