qiuyang
qiuyang

Reputation: 11

How do you shut down an Actor if it has not received a message within a period of time?

Current code is following:

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  override def receive: Receive = {
    case LatestMessageSignal => awaitLatestMessageSignal()
  }

  private def awaitLatestMessageSignal(): Unit = {
    import scala.concurrent.duration._
    context.system.scheduler.scheduleOnce(30.seconds) {
      context.stop(self)
    }
  }

}

When the actor receives a LatestMessageSignal message, it will call awaitLatestMessageSignal() method that will wait for 30 seconds, and then stop the actor.

Upvotes: 0

Views: 182

Answers (2)

Jack Leow
Jack Leow

Reputation: 22477

It seems like you want to stop the actor after 30 seconds of inactivity? If so you can use ActorContext#setReceiveTimeout(Duration)

For instance:

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  context.setReceiveTimeout(30.seconds)

  override def receive: Receive = {
    case ReceivedTimeout => context.stop(self)
  }

}

Upvotes: 5

Ivan Stanislavciuc
Ivan Stanislavciuc

Reputation: 7275

As far as I understand, you'd like to keep MessageCheckpoint alive and stop it if there are no new messages coming to it for 30 seconds.

This actor will stay alive till you send messages to it and will stop after 30 seconds of inactivity.


case object LatestMessageSignal

class MessageCheckpoint extends Actor with ActorLogging {

  override def postStop(): Unit = {
    super.postStop()
    log.info("Stopping")
  }

  override def receive: Receive = receiveWithTimer(None)

  private def receiveWithTimer(timer: Option[Cancellable]): Receive = {
    case LatestMessageSignal =>
      timer.foreach(_.cancel())
      context.become(receiveWithTimer(Option(initiateTimer())))
  }

  private def initiateTimer(): Cancellable = {
    import context.dispatcher
    log.info("Initiating new poison pill timer")
    context.system.scheduler.scheduleOnce(30.seconds, self, PoisonPill)
  }
}

I would like the actor can discard current processing message when another new message comes, and will process the latest message instead

This is not possible. I think you are assuming that method awaitLatestMessageSignal is blocking the actor. This method is non blocking, it will create a timer and return immediately. The message will be processed quite fast and actor be ready for the next message. Actor is processing messages one at a time and there is no way to cancel message processing.

Upvotes: 2

Related Questions