hiroprotagonist
hiroprotagonist

Reputation: 902

how to watch multiple akka actors for termination

I have akka system which is basically two producer actors that send messages to one consumer actor. In a simplified form I have something like this:

class ProducerA extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageA()
    }

    ... more code ...
}

class ProducerB extends Actor {
    def receive = {
        case Produce => Consumer ! generateMessageB()
    }

    ... more code ...
}

class Consumer extends Actor {
    def receive = {
        case A => handleMessageA(A)
        case B => handleMessageB(B)
    }

    ... more code ...
}

And they are all siblings of the same akka system.

I am trying to figure out how to terminate this system gracefully. This means that on shutdown I want ProducerA and ProducerB to stop immediately and then I want Consumer to finish processing whatever messages are left in the message queue and then shutdown.

It seems like what I want is for the Consumer actor to be able to watch for the termination of both ProducerA and ProducerB. Or generally, it seems like what I want is to be able to send a PoisonPill message to the Consumer after both the producers are stopped.

https://alvinalexander.com/scala/how-to-monitor-akka-actor-death-with-watch-method

The above tutorial has a pretty good explanation of how one actor can watch for the termination of one other actor but not sure how an actor could watch for the termination of multiple actors.

Upvotes: 0

Views: 2049

Answers (3)

hiroprotagonist
hiroprotagonist

Reputation: 902

so the solution I ended up going with was inspired by Derek Wyatt's terminator pattern

val shutdownFut = Future.sequence(
  Seq(
    gracefulStop(producerA, ProducerShutdownWaitSeconds seconds),
    gracefulStop(producerB, ProducerShutdownWaitSeconds seconds),
  )
).flatMap(_ => gracefulStop(consumer, ConsumerShutdownWaitSeconds seconds))

Await.result(shutdownFut, (ProducerShutdownWaitSeconds seconds) + (ConsumerShutdownWaitSeconds seconds))

This was more or less exactly what I wanted. The consumer shutdown waits for the producers to shutdown based on the fulfillment of futures. Furthermore, the whole shutdown itself results in a future which you can wait on therefore being able to keep the thread up long enough for everything to clean up properly.

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19507

An actor can watch multiple actors simply via multiple invocations of context.watch, passing in a different ActorRef with each call. For example, your Consumer actor could watch the termination of the Producer actors in the following way:

case class WatchMe(ref: ActorRef)

class Consumer extends Actor {
  var watched = Set[ActorRef]()

  def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched = watched + ref
    case Terminated(ref) =>
      watched = watched - ref
      if (watched.isEmpty) self ! PoisonPill
    // case ...
  }
}

Both Producer actors would send their respective references to Consumer, which would then monitor the Producer actors for termination. When the Producer actors are both terminated, the Consumer sends a PoisonPill to itself. Because a PoisonPill is treated like a normal message in an actor's mailbox, the Consumer will process any messages that are already enqueued before handling the PoisonPill and shutting itself down.

A similar pattern is described in Derek Wyatt's "Shutdown Patterns in Akka 2" blog post, which is mentioned in the Akka documentation.

Upvotes: 2

Shankar Shastri
Shankar Shastri

Reputation: 1154

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration.DurationInt

class Producer extends Actor {
  def receive = {
    case _ => println("Producer received a message")
  }
}

case object KillConsumer

class Consumer extends Actor {

  def receive = {
    case KillConsumer =>
      println("Stopping Consumer After All Producers")
      context.stop(self)
    case _ => println("Parent received a message")
  }

  override def postStop(): Unit = {
    println("Post Stop Consumer")
    super.postStop()
  }
}

class ProducerWatchers(producerListRef: List[ActorRef], consumerRef: ActorRef) extends Actor {
  producerListRef.foreach(x => context.watch(x))
  context.watch(consumerRef)
  var producerActorCount = producerListRef.length
  implicit val timeout: Timeout = Timeout(5 seconds)
  override def receive: Receive = {
    case Terminated(x) if producerActorCount == 1 && producerListRef.contains(x) =>
      consumerRef ! KillConsumer

    case Terminated(x) if producerListRef.contains(x) =>
      producerActorCount -= 1

    case Terminated(`consumerRef`) =>
      println("Killing ProducerWatchers On Consumer End")
      context.stop(self)

    case _ => println("Dropping Message")
  }

  override def postStop(): Unit = {
    println("Post Stop ProducerWatchers")
    super.postStop()
  }
}

object ProducerWatchers {
  def apply(producerListRef: List[ActorRef], consumerRef: ActorRef) : Props = Props(new ProducerWatchers(producerListRef, consumerRef))
}

object AkkaActorKill {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("AkkaActorKill")
    implicit val timeout: Timeout = Timeout(10 seconds)

    val consumerRef = actorSystem.actorOf(Props[Consumer], "Consumer")
    val producer1 = actorSystem.actorOf(Props[Producer], name = "Producer1")
    val producer2 = actorSystem.actorOf(Props[Producer], name = "Producer2")
    val producer3 = actorSystem.actorOf(Props[Producer], name = "Producer3")

    val producerWatchers = actorSystem.actorOf(ProducerWatchers(List[ActorRef](producer1, producer2, producer3), consumerRef),"ProducerWatchers")

    producer1 ! PoisonPill
    producer2 ! PoisonPill
    producer3 ! PoisonPill

    Thread.sleep(5000)
    actorSystem.terminate
  }
}

It can be implemented using ProducerWatchers actor, which manages producers killed, once all the producers are killed you can kill the Consumer actor, and then the ProducerWatchers actor.

Upvotes: 1

Related Questions