Jeriho
Jeriho

Reputation: 7299

Actor that waits for completing all jobs in children before exit

Can't figure out how to resolve following problem: I have a few actors (workers) that execute tasks in some way when they recieve (I mean react) them. A main actor (foreman) controls this process and can recieve task to stop work. In this case main actor must stop creating new tasks and wait when workers will finish all existing tasks and only then main actor should exit.

import actors.Actor
import actors.Actor._

class Foreman extends Actor{
  val workerA = new WorkerA
  val workerB = new WorkerB
  val workerC = new WorkerC
  self.link(workerA)
  self.link(workerB)
  self.link(workerC)
  def act{
    workerA.start
    workerB.start
    workerC.start

    // adding tasks to workers somehow
    //...
    loop{
      case ResultOfTask(res) => //...
      case Stop => //workers mustn't immediately stop but must finish their tasks and then exit
      case ProductionAccident => //...
    }


  }
}

case class Task(activity:String)
case class ResultOfTask(result:String)

trait Worker extends Actor{

  def act{
    loop{
      react{
        case Task(activity) => sender ! processTask(activity)
      }
    }
  }

  def processTask(activity:String):ResultOfTask
}

To solve this I wrote following code:

def condition = workerA.getState!=State.Suspended  && workerB.getState!=State.Suspended && workerC.getState!=State.Suspended && mailboxSize == 0
case Stop => {
  if(condition) exit("sweet dreams") else continue
}

to check if main actor should exit. Another variant to have counder in "Worker" trait, increment it when worker recieves message and decrement when it reponses.

trait Worker extends Actor {
  private var count = 0
  def act {
    loop{
      react{
        case Task(activity) => {
          count += 1
          sender ! processTask(activity)
          count -= 1
        }
      }
    }
  }
  def hasDoneAllTasks = count == 0

  def processTask(activity: String): ResultOfTask
}

And "condition" function in "Foreman" will be

def condition = workerA.hasDoneAllTasks   && workerB.hasDoneAllTasks  && workerC.hasDoneAllTasks  && mailboxSize == 0

I hope there are better solutions and you will propose them.

Upvotes: 3

Views: 1403

Answers (4)

zentrope
zentrope

Reputation: 151

One thing you can take advantage of is the trapExit, case Exit and exit, system. In your main actor, you can set trapExit to true:

// Foreman

def act() {
  trapExit = true
  link(workerA)
  link(workerB)
  ...
}

This means that your foreman actor will get an Exit message when the worker process terminates, including a reason:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: AnyRef) => {
      // decrement counter, or list of workers, and exit if empty
    }
    ...
   }}
}

You can pattern match on the reason parameter. For instance, in your worker class, you might exit using a variety of case classes indicating what the foreman should do:

// Worker:

exit(WorkComplete)
exit(Emergency)

etc, and so on. When your worker throws an exception, it'll terminate and send the linked process an Exit message containing the exception. Given this sort of thing, you can end up with something like:

// Foreman

def act() {
  ....
  loop { react {
    case Exit (worker: Actor, reason: WorkComplete) => {
      // decrement counter, or list of workers, and exit if empty
    }

    case Exit (worker: Actor, reason: TasksExhausted) => {
      // log something, or make shut down worker if too many are exhausted
    }

    case Exit (worker: Actor, reason: Exception) => {
      // log the exception, then restart the actor
    }

    ...
   }}
}

It's unclear from your initial question if you want the workers to keep on working even when they're done until the foreman tells them they should exit when it's time. If that's the case, sending the workers a messages telling them to "exit when finished" works, and you can tell they've finished by using the trapExit mechanism.

Hope this spurs an interesting solution!

Upvotes: 1

Rex Kerr
Rex Kerr

Reputation: 167901

If the foreman always expects an answer from the workers, the solution is easy: the foreman maintains a counter, and each time it sends a message it increments it and each time it receives one from a worker it decrements it. Any time the counter is zero it is free to stop itself (assuming that no-one else sends messages to the workers).

If the foreman does not always expect an answer from the workers, you can make this the case by having a no-content message

case object Done { }

and having the workers reply with that when they're finished. Then, see above.

If the foreman is not the only one talking to the workers, or you want there to be less chatter in the background, then the foreman and the workers will have to negotiate. The foreman can send

case object RequestStop { }

and the workers will do something graceful and reply with Done when they're done. When the foreman receives as many Done messages as it has sent RequestStops, it is free to exit.

Upvotes: 1

IttayD
IttayD

Reputation: 29133

My approach is to do all calculations in the Foreman.

You didn't write how the foreman creates tasks, so I'll assume it is in response to a message

class Foreman extends Actor{

  var numTasks: Int = 0
  var shouldExit = false

  def act = loop {
   react {
     case t: Task =>
       if (!shouldExit)  {
         numTasks += 1
         selectWorker ! t
       } else {
         // send some kind of error status to sender
       }
     case ResultOfTask(rest) =>
       numTasks -= 1
       // ....
       if (numTasks == 0 && shouldExit) exit
     case Stop() => shoudExit = true
}

An improvement then is to prioritize Stop so that it is handled even if there are Task messages in the queue

  def act = loop {
   reactWithin(0) {
     case Stop() => shouldStop = true
     case TIMEOUT => react {
       case t: Task =>
         if (!shouldExit)  {
           numTasks += 1
           selectWorker ! t
         } else {
           // send some kind of error status to sender
         }
       case ResultOfTask(rest) =>
         numTasks -= 1
         // ....
         if (numTasks == 0 && shouldExit) exit
       case Stop() => shoudExit = true
  }
}

Upvotes: 1

pr1001
pr1001

Reputation: 21962

Why not include a reference to the Foreman actor when sending jobs to the Workers? Then, when the workers shut down they can send a notification to the Foreman. Each time the Foreman receives a worker shut down message it logs it and sees if all the workers have completed. If so, it shuts itself down too.

Upvotes: 1

Related Questions