Christophe De Troyer
Christophe De Troyer

Reputation: 2922

Akka wait for computation of actor

I would like to create a minimal example of an actor that sends off a message to an actor and then waits for the response of that actor. The reason for this example is that I want to use it in my thesis in context of disussing the usage other language features (e.g., futures) instead of pure actors. So the point here is that it has to be an actor that waits for a message before processing anything else.

The idea I had was to demonstrate an actor that requests a file to be read from disk, then does some long computation and then waits for the read to finish.

What I have so far is the following:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.io.Source


case class FileContents(content: String)
class WorkerActor extends Actor
{
  def receive =
  {
    case "compute" =>
      println("Computing!")
      // Create actor to read the file
      val reader = context.actorOf(Props[ReadFileActor])
      reader ! ReadFile("/home/christophe/code/thesis-example/src/main/resources/file.txt")

      // Heavy computation
      Thread.sleep(5000)

    case FileContents(content) =>
      println("Got file content:\n" + content)
      // Continue computation.
  }
}

case class ReadFile(path: String)
class ReadFileActor extends Actor
{
  def receive =
  {
    case ReadFile(path) =>
      var contents: String = ""
      for (line <- Source.fromFile(path).getLines())
      {
        contents += line
      }
      sender ! FileContents(contents)
  }
}


object Main extends App
{
  val system = ActorSystem("HelloSystem")

  val worker = system.actorOf(Props[WorkerActor], name = "worker")

  worker ! "compute"
  worker ! "compute"
}

But what happens here is that WorkerActor receives the compute message, and then starts a child actor to read in the file. After the heave computation it receives the second compute message instead. And finally receives the two messages from the ReadFile actor.

What I actually want to happen is that WorkerActor receives the compute message, does the heavy computation and then waits until he receives the FileContents message. Only after that it can receive any other message (i.e., the second compute message). I have read the docs and searched around for examples but i cant seem to find anything on it.

Disclaimer: I am not using Akka except for this small example in context of my thesis.

Upvotes: 0

Views: 1077

Answers (2)

Eugene Platonov
Eugene Platonov

Reputation: 3285

Sounds like what you want is to keep processing this message until you get a response. in other words block

class WorkerActor extends Actor {
  def receive = {
     case "compute" =>
     println("Computing!")
     // Create actor to read the file
     val reader = context.actorOf(Props[ReadFileActor])
     val future: Future[FileContents] = 
       (reader ? ReadFile("file.txt").mapTo[FileContents]

    // Heavy computation
    Thread.sleep(5000)
    // this will block current thread and thus the actor,
    // so it will not process any other messages until 
    // future is completed or time is out
    Await.result(future, timeout)   
  }
}

BUT, this is considered as a very bad thing in actor lands.

Upvotes: 0

dk14
dk14

Reputation: 22374

Just create several (pool of) workers for several compute messages instead of one worker, smthng like:

object Main extends App {

  val system = ActorSystem("HelloSystem")

  val router = system.actorOf(RoundRobinPool(2).props(Props[Worker]), "router")

  router ! "compute"
  router ! "compute"

}

If you want second worker to be launched after first:

def receive = {
  case "compute" => ...
  case FileContents(content) =>
     println("Got file content:\n" + content)
     // Continue computation.
     context.parent ! "compute" //send to the pool 
}

...

//Main:
router ! "compute"

Another option is to remember sender of "compute" (it will be your Main) and send the response back to the top-level:

var main = _

def receive = {
  case "compute" => 
     ...
     main = sender
  case FileContents(content) =>
     ...
     main ! "ack" 
}

//Main:
(router ? "compute") foreach (_ => router ! compute)

If you don't like future here - you may rewrite it with actor:

//Main
class MainActor extends Actor {
   def receive = {
      case "start" => router ! "compute"
      case "ack" => router ! "compute"
   }
}

P.S. Blocking computations inside actor should be managed properly or they may lead to thread starvation.

Upvotes: 1

Related Questions