Reputation: 301
As a newbie, I am trying to understand how actors work. And, from the documentation, I think I understand that actors are objects which gets executed in sync mode and also that actor execution can contain blocking/sync method calls, e.g. db requests
But, what I don't understand is that if you write an actor that has some blocking invocations inside (like a blocking query execution), it will mess up the whole thread pool (in the sense that cpu utilization will go down, etc.), right ? I mean, from my understanding, there is no way for JVM to understand whether it can switch that thread to someone else, if/when the actor makes a blocking call.
So, given the nature of concurrency, shouldn't it be obvious that Actors should not be doing any blocking calls, ever?
If that is the case, what is the recommended way of doing a non-blocking/async call, let's say a web service call that fetches something and sends a message to another actor when that request is completed? Should we simply use something like within the actor:
future map { response => x ! response.body }
Is this the proper way of handling this?
Would appreciate it if you can clarify this for me.
Upvotes: 30
Views: 16175
Reputation: 2670
Really great intro "The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html.
Actor receives message, wraps blocking code to future, in it's Future.onSuccess method - sends out results using other async messages. But beware that sender variable could change, so close it (make a local reference in the future object).
p.s.: The Neophyte's Guide to Scala - really great book.
Updated: (added sample code)
We have worker and manager. Manager sets work to be done, worker reports "got it" and starts long process ( sleep 1000 ). Meanwhile system pings manager with messages "alive" and manager pings worker with them. When work done - worker notifies manager on it.
NB: execution of sleep 1000 done in imported "default/global" thread pool executor - you can get thread starvation. NB: val commander = sender is needed to "close" a reference to original sender, cause when onSuccess will be executed - current sender within actor could be already set to some other 'sender' ...
Log:
01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!
Code:
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global
object Sample {
private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")
def printWithTime(msg: String) = {
println(fmt.format(new Date()) + " " + msg)
}
class WorkerActor extends Actor {
protected def receive = {
case "now" =>
val commander = sender
printWithTime("worker: got command")
future {
printWithTime("worker: started")
Thread.sleep(1000)
printWithTime("worker: done")
}(ExecutionContext.Implicits.global) onSuccess {
// here commander = original sender who requested the start of the future
case _ => commander ! "done"
}
commander ! "working"
case "alive?" =>
printWithTime("worker: alive")
}
}
class ManagerActor(worker: ActorRef) extends Actor {
protected def receive = {
case "do" =>
worker ! "now"
printWithTime("manager: flush sent")
case "working" =>
printWithTime("manager: resource allocated")
case "done" =>
printWithTime("manager: work is done")
case "alive?" =>
printWithTime("manager alive")
worker ! "alive?"
}
}
def main(args: Array[String]) {
val config = ConfigFactory.parseString("" +
"akka.loglevel=DEBUG\n" +
"akka.debug.lifecycle=on\n" +
"akka.debug.receive=on\n" +
"akka.debug.event-stream=on\n" +
"akka.debug.unhandled=on\n" +
""
)
val system = ActorSystem("mine", config)
val actor1 = system.actorOf(Props[WorkerActor], "worker")
val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")
actor2 ! "do"
actor2 ! "alive?"
actor2 ! "alive?"
actor2 ! "alive?"
printWithTime("Humming ...")
Thread.sleep(5000)
printWithTime("Shutdown!")
system.shutdown()
}
}
Upvotes: 17
Reputation: 24403
It really depends on the use-case. If the queries do not need to be serialized, then you can execute the query in a future and send the results back to the sender as follows:
import scala.concurrent.{ future, blocking}
import akka.pattern.pipe
val resFut = future {
blocking {
executeQuery()
}
}
resFut pipeTo sender
You could also create a dedicated dispatcher exclusively for the DB calls and use a router for actor creation. This way you can also easily limit the number of concurrent DB requests.
Upvotes: 17
Reputation: 14420
You are right to be thinking about the Thread Pool if you are considering doing blocking calls in Akka. The more blocking you do, the larger the Thread Pool you will need. A completely Non-Blocking system only really needs a pool of threads equal to the number of CPU cores of your machine. The reference configuration uses a pool of 3 times the number of CPU cores on the machine to allow for some blocking:
# The core pool size factor is used to determine thread pool core size
# using the following formula: ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0
But you might want to increase akka.default-dispatcher.fork-join-executor.core-pool-size-factor
to a higher number if you do more blocking, or make a dispatcher other than the default specifically for blocking calls with a higher fork-join-executor.core-pool-size-factor
WRT what is the best way to do blocking calls in Akka. I would recommend scaling out by making multiple instances of the actors that do blocking calls and putting a router infront of them to make them look like a single actor to the rest of your application.
Upvotes: 1