Long Thai
Long Thai

Reputation: 817

Execute list of Futures in parallel

I'm very new to Scala's Future and Akka, currently, I'm trying to implement an application which executes a list of independent tasks and gather the result together.

For example, I want to have an application consisting of multiple tasks, each of which receives a number, sleeps for few seconds and then return the "Hello " message.

The actor is implemented as follow:

class HelloActor extends Actor {
  def receive = {
    case name:Int => {
      println("%s will sleep for %s seconds".format(name, name % 4))
      Thread.sleep(name % 4 * 1000)
      sender ! "Hello %s".format(name)
    }
  }
}

The main object is implemented as:

object HelloAkka extends App {
  val system = ActorSystem("HelloSystem")

  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")

  implicit val timeout = Timeout(20, TimeUnit.SECONDS)

  val futures = (1 to 10).map(num => {
    helloActor ? num
  })

  val future = Future.sequence(futures)

  val results = Await.result(future, timeout.duration)

  println(results)

  system.shutdown
}

As each task will sleep for 0, 1, 2 or 3 seconds, I expect the tasks which have shorter sleep to be executed first. However, the result is:

1 will sleep for 1 seconds
2 will sleep for 2 seconds
3 will sleep for 3 seconds
4 will sleep for 0 seconds
5 will sleep for 1 seconds
6 will sleep for 2 seconds
7 will sleep for 3 seconds
8 will sleep for 0 seconds
9 will sleep for 1 seconds
10 will sleep for 2 seconds
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10)

In other words, all tasks are executed in sequence. I wonder if there is any way for me to execute all tasks in parallel instead.

Upvotes: 1

Views: 1596

Answers (3)

Diego Martinoia
Diego Martinoia

Reputation: 4662

Messages sent from the same actor to the same actor will be executed in sequence.

You have two options there.

Either create a new copy of the HelloActor for each message, so that they are all executed in parallel, or modify your HelloActor to be something as follows (may be wrong import, going by memory):

import akka.pattern.pipe._

class HelloActor extends Actor {
  def receive = {
    case name:Int => {
      println("%s will sleep for %s seconds".format(name, name % 4))
      Future(sleepAndRespond(name)) pipeTo sender
   }
 }

 def sleepAndRespond(name:String) = {
   Thread.sleep(name % 4 * 1000)
   "Hello %s".format(innerName)
 }
}

This way, the sequential part executed is just the piping of the future, which is then executed asynchronously for each of the ten messages.

Upvotes: 1

drexin
drexin

Reputation: 24413

Instead of starting multiple actors, as suggested in the comments and answer, I would suggest to execute the actual task in a Future. So your actor would be more like a coordinator for the tasks. E.g.:

//...    

// import pipe pattern to get access to `pipeTo` method
import akka.pattern.pipe
import scala.concurrent.Future

// the `Future`s will be executed on this dispatcher
// depending on your needs, you may want to create a 
// dedicated executor for this
class TaskCoordinatorActor extends Actor {
  import context.dispatcher

  def receive = {
    case name: Int =>
      Future {
        Thread.sleep(name % 4 * 1000)
        "Hello %s".format(name)
      } pipeTo sender()
  }
}

The above code executes your task in a scala.concurrent.Future and pipes the result to the original sender. This way the actor does not block until the task has been finished, but is ready to receive the next message, once the Future has been created.

P.S.: Instead of sending plain integers, you should create message types, that make explicit what you want the actor to do. In your case it could for example be:

case class Sleep(duration: Duration)

Upvotes: 1

hicolour
hicolour

Reputation: 784

As mentioned in the comments you're sending all the tasks/messages to one actor, and it's guaranteed that all this tasks/messages will be handled in sequence.

To have parallel handling of the tasks, you need to have multiple instances of the handler actor, in you case HelloActor.

Of course you can just create multiple instances of the HelloActor but this definitely not the good practice.

For such kind of tasks, you should use build-in routing feature, which allows you to manage workers/handlers pool and interact with them via one router actor, eg.

val router: ActorRef =
  context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router")

...
router ? num
...

Please follow Akka Routing documentation to get more details.

Upvotes: 4

Related Questions