Reputation: 7299
Can't figure out how to resolve following problem: I have a few actors (workers) that execute tasks in some way when they recieve (I mean react) them. A main actor (foreman) controls this process and can recieve task to stop work. In this case main actor must stop creating new tasks and wait when workers will finish all existing tasks and only then main actor should exit.
import actors.Actor
import actors.Actor._
class Foreman extends Actor{
val workerA = new WorkerA
val workerB = new WorkerB
val workerC = new WorkerC
self.link(workerA)
self.link(workerB)
self.link(workerC)
def act{
workerA.start
workerB.start
workerC.start
// adding tasks to workers somehow
//...
loop{
case ResultOfTask(res) => //...
case Stop => //workers mustn't immediately stop but must finish their tasks and then exit
case ProductionAccident => //...
}
}
}
case class Task(activity:String)
case class ResultOfTask(result:String)
trait Worker extends Actor{
def act{
loop{
react{
case Task(activity) => sender ! processTask(activity)
}
}
}
def processTask(activity:String):ResultOfTask
}
To solve this I wrote following code:
def condition = workerA.getState!=State.Suspended && workerB.getState!=State.Suspended && workerC.getState!=State.Suspended && mailboxSize == 0
case Stop => {
if(condition) exit("sweet dreams") else continue
}
to check if main actor should exit. Another variant to have counder in "Worker" trait, increment it when worker recieves message and decrement when it reponses.
trait Worker extends Actor {
private var count = 0
def act {
loop{
react{
case Task(activity) => {
count += 1
sender ! processTask(activity)
count -= 1
}
}
}
}
def hasDoneAllTasks = count == 0
def processTask(activity: String): ResultOfTask
}
And "condition" function in "Foreman" will be
def condition = workerA.hasDoneAllTasks && workerB.hasDoneAllTasks && workerC.hasDoneAllTasks && mailboxSize == 0
I hope there are better solutions and you will propose them.
Upvotes: 3
Views: 1403
Reputation: 151
One thing you can take advantage of is the trapExit, case Exit and exit, system. In your main actor, you can set trapExit to true:
// Foreman
def act() {
trapExit = true
link(workerA)
link(workerB)
...
}
This means that your foreman actor will get an Exit message when the worker process terminates, including a reason:
// Foreman
def act() {
....
loop { react {
case Exit (worker: Actor, reason: AnyRef) => {
// decrement counter, or list of workers, and exit if empty
}
...
}}
}
You can pattern match on the reason parameter. For instance, in your worker class, you might exit using a variety of case classes indicating what the foreman should do:
// Worker:
exit(WorkComplete)
exit(Emergency)
etc, and so on. When your worker throws an exception, it'll terminate and send the linked process an Exit message containing the exception. Given this sort of thing, you can end up with something like:
// Foreman
def act() {
....
loop { react {
case Exit (worker: Actor, reason: WorkComplete) => {
// decrement counter, or list of workers, and exit if empty
}
case Exit (worker: Actor, reason: TasksExhausted) => {
// log something, or make shut down worker if too many are exhausted
}
case Exit (worker: Actor, reason: Exception) => {
// log the exception, then restart the actor
}
...
}}
}
It's unclear from your initial question if you want the workers to keep on working even when they're done until the foreman tells them they should exit when it's time. If that's the case, sending the workers a messages telling them to "exit when finished" works, and you can tell they've finished by using the trapExit mechanism.
Hope this spurs an interesting solution!
Upvotes: 1
Reputation: 167901
If the foreman always expects an answer from the workers, the solution is easy: the foreman maintains a counter, and each time it sends a message it increments it and each time it receives one from a worker it decrements it. Any time the counter is zero it is free to stop itself (assuming that no-one else sends messages to the workers).
If the foreman does not always expect an answer from the workers, you can make this the case by having a no-content message
case object Done { }
and having the workers reply with that when they're finished. Then, see above.
If the foreman is not the only one talking to the workers, or you want there to be less chatter in the background, then the foreman and the workers will have to negotiate. The foreman can send
case object RequestStop { }
and the workers will do something graceful and reply with Done
when they're done. When the foreman receives as many Done
messages as it has sent RequestStop
s, it is free to exit.
Upvotes: 1
Reputation: 29133
My approach is to do all calculations in the Foreman.
You didn't write how the foreman creates tasks, so I'll assume it is in response to a message
class Foreman extends Actor{
var numTasks: Int = 0
var shouldExit = false
def act = loop {
react {
case t: Task =>
if (!shouldExit) {
numTasks += 1
selectWorker ! t
} else {
// send some kind of error status to sender
}
case ResultOfTask(rest) =>
numTasks -= 1
// ....
if (numTasks == 0 && shouldExit) exit
case Stop() => shoudExit = true
}
An improvement then is to prioritize Stop so that it is handled even if there are Task messages in the queue
def act = loop {
reactWithin(0) {
case Stop() => shouldStop = true
case TIMEOUT => react {
case t: Task =>
if (!shouldExit) {
numTasks += 1
selectWorker ! t
} else {
// send some kind of error status to sender
}
case ResultOfTask(rest) =>
numTasks -= 1
// ....
if (numTasks == 0 && shouldExit) exit
case Stop() => shoudExit = true
}
}
Upvotes: 1
Reputation: 21962
Why not include a reference to the Foreman actor when sending jobs to the Workers? Then, when the workers shut down they can send a notification to the Foreman. Each time the Foreman receives a worker shut down message it logs it and sees if all the workers have completed. If so, it shuts itself down too.
Upvotes: 1