Reputation: 11
Current code is following:
case object LatestMessageSignal
class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {
override def receive: Receive = {
case LatestMessageSignal => awaitLatestMessageSignal()
}
private def awaitLatestMessageSignal(): Unit = {
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(30.seconds) {
context.stop(self)
}
}
}
When the actor receives a LatestMessageSignal
message, it will call awaitLatestMessageSignal()
method that will wait for 30 seconds, and then stop the actor.
Upvotes: 0
Views: 182
Reputation: 22477
It seems like you want to stop the actor after 30 seconds of inactivity? If so you can use ActorContext#setReceiveTimeout(Duration)
For instance:
case object LatestMessageSignal
class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {
context.setReceiveTimeout(30.seconds)
override def receive: Receive = {
case ReceivedTimeout => context.stop(self)
}
}
Upvotes: 5
Reputation: 7275
As far as I understand, you'd like to keep MessageCheckpoint
alive and stop it if there are no new messages coming to it for 30 seconds.
This actor will stay alive till you send messages to it and will stop after 30 seconds of inactivity.
case object LatestMessageSignal
class MessageCheckpoint extends Actor with ActorLogging {
override def postStop(): Unit = {
super.postStop()
log.info("Stopping")
}
override def receive: Receive = receiveWithTimer(None)
private def receiveWithTimer(timer: Option[Cancellable]): Receive = {
case LatestMessageSignal =>
timer.foreach(_.cancel())
context.become(receiveWithTimer(Option(initiateTimer())))
}
private def initiateTimer(): Cancellable = {
import context.dispatcher
log.info("Initiating new poison pill timer")
context.system.scheduler.scheduleOnce(30.seconds, self, PoisonPill)
}
}
I would like the actor can discard current processing message when another new message comes, and will process the latest message instead
This is not possible. I think you are assuming that method awaitLatestMessageSignal
is blocking the actor. This method is non blocking, it will create a timer and return immediately. The message will be processed quite fast and actor be ready for the next message. Actor is processing messages one at a time and there is no way to cancel message processing.
Upvotes: 2