Reputation: 2922
I would like to create a minimal example of an actor that sends off a message to an actor and then waits for the response of that actor. The reason for this example is that I want to use it in my thesis in context of disussing the usage other language features (e.g., futures) instead of pure actors. So the point here is that it has to be an actor that waits for a message before processing anything else.
The idea I had was to demonstrate an actor that requests a file to be read from disk, then does some long computation and then waits for the read to finish.
What I have so far is the following:
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.io.Source
case class FileContents(content: String)
class WorkerActor extends Actor
{
def receive =
{
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
reader ! ReadFile("/home/christophe/code/thesis-example/src/main/resources/file.txt")
// Heavy computation
Thread.sleep(5000)
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
}
}
case class ReadFile(path: String)
class ReadFileActor extends Actor
{
def receive =
{
case ReadFile(path) =>
var contents: String = ""
for (line <- Source.fromFile(path).getLines())
{
contents += line
}
sender ! FileContents(contents)
}
}
object Main extends App
{
val system = ActorSystem("HelloSystem")
val worker = system.actorOf(Props[WorkerActor], name = "worker")
worker ! "compute"
worker ! "compute"
}
But what happens here is that WorkerActor
receives the compute message, and then starts a child actor to read in the file. After the heave computation it receives the second compute
message instead. And finally receives the two messages from the ReadFile
actor.
What I actually want to happen is that WorkerActor
receives the compute
message, does the heavy computation and then waits until he receives the FileContents
message. Only after that it can receive any other message (i.e., the second compute
message).
I have read the docs and searched around for examples but i cant seem to find anything on it.
Disclaimer: I am not using Akka except for this small example in context of my thesis.
Upvotes: 0
Views: 1077
Reputation: 3285
Sounds like what you want is to keep processing
this message until you get a response. in other words block
class WorkerActor extends Actor {
def receive = {
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
val future: Future[FileContents] =
(reader ? ReadFile("file.txt").mapTo[FileContents]
// Heavy computation
Thread.sleep(5000)
// this will block current thread and thus the actor,
// so it will not process any other messages until
// future is completed or time is out
Await.result(future, timeout)
}
}
BUT
, this is considered as a very bad thing
in actor lands.
Upvotes: 0
Reputation: 22374
Just create several (pool of) workers for several compute
messages instead of one worker, smthng like:
object Main extends App {
val system = ActorSystem("HelloSystem")
val router = system.actorOf(RoundRobinPool(2).props(Props[Worker]), "router")
router ! "compute"
router ! "compute"
}
If you want second worker to be launched after first:
def receive = {
case "compute" => ...
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
context.parent ! "compute" //send to the pool
}
...
//Main:
router ! "compute"
Another option is to remember sender
of "compute"
(it will be your Main
) and send the response back to the top-level:
var main = _
def receive = {
case "compute" =>
...
main = sender
case FileContents(content) =>
...
main ! "ack"
}
//Main:
(router ? "compute") foreach (_ => router ! compute)
If you don't like future here - you may rewrite it with actor:
//Main
class MainActor extends Actor {
def receive = {
case "start" => router ! "compute"
case "ack" => router ! "compute"
}
}
P.S. Blocking computations inside actor should be managed properly or they may lead to thread starvation.
Upvotes: 1