alexwlchan
alexwlchan

Reputation: 6108

How can I run parallel instances of a function that returns a Try?

I have a function that returns a Try, and I want to run multiple instances of it in parallel, but I’m coming up blank on how to do that – I can only seem to run it one after the other.

Context: this function is meant to acquire a lock so that if multiple threads/workers are running in parallel, they don’t read on each other’s toes. In the tests, I want to run five instances simultaneously, and assert that all but one of them was locked out. This was working when the function returned a Future, but I’ve done some refactoring and now it returns a Try, and the test has stopped working.

The behaviour doesn’t seem to be related to the locking code – it seems I just don’t understand concurrency!


I’ve been trying to use Future.fromTry, and execute them in parallel. For example:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Success, Try}

object Main extends App {
  def greet(name: String): Try[Unit] = Try {
    println(s"Hello $name!")
    Thread.sleep(1000)
    println(s"Goodbye $name!")
    ()
  }

  Seq("alice", "bob", "carol", "dave", "eve").map { name =>
    Future.fromTry { greet(name) }
  }
}

I’d expect to see all the “Hello” messages, and then all the “Goodbye” messages – instead, it seems to be executing them one after the other.

Hello alice!
Goodbye alice!
Hello bob!
Goodbye bob!
Hello carol!
Goodbye carol!
Hello dave!
Goodbye dave!
Hello eve!
Goodbye eve!

I looked around, and found suggestions about tweaking the ExecutionContext and adding parallelism – thing is, this environment seems perfectly happy to run Futures in parallel.

On the same machine, with the same global ExecutionContext, if I tweak the function to return a Future, not a Try, I see the output I’d expect, and the functions appear to be running in parallel.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Success, Try}

object Main extends App {
  def greet(name: String): Future[Unit] = Future {
    println(s"Hello $name!")
    Thread.sleep(1000)
    println(s"Goodbye $name!")
    ()
  }

  Seq("faythe", "grace", "heidi", "ivan", "judy").map { name =>
    greet(name)
  }

  Thread.sleep(2000)  // Let the futures finish
}
Hello faythe!
Hello ivan!
Hello grace!
Hello judy!
Hello heidi!
Goodbye ivan!
Goodbye grace!
Goodbye heidi!
Goodbye judy!
Goodbye faythe!

What am I doing wrong with Future.fromTry that means it’s waiting for the Futures to finish? How do I make it match the second example?

Or am I barking up the wrong tree entirely?

Upvotes: 2

Views: 172

Answers (1)

The documentation explicitly states that fromTry will create an already completed Future from the result, thus it first evaluates the function and then lift it inside the Future context. As such, it is completely serial.

You can first create a List[Future[String]] from the names, and then map the list and map the inner Futures to execute your function.
Or, since a Future already represents the possibility failure (and internally uses Try), why not simply use Future in your function (as you said it was before).

Upvotes: 3

Related Questions