Mikaël Mayer
Mikaël Mayer

Reputation: 10701

Akka actor get remaining message list

I have an actor computing something intensive, and only the last result ideally should count.

I would like that if it receive multiple messages of the same type A(data), only the last one is handled and the previous ones are discarded.

How can I achieve that?

Upvotes: 1

Views: 395

Answers (1)

Odomontois
Odomontois

Reputation: 16308

Custom mailbox

You can try implement some custom mailbox, containing 0 or 1 message:

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config

class SingleMessageQueue extends MessageQueue {
  var message = Option.empty[Envelope]
  def enqueue(receiver: ActorRef, handle: Envelope) = message = Some(handle)
  def dequeue() = {
    val handle = message.orNull
    message = None
    handle
  }
  def numberOfMessages = message.size
  def hasMessages = message.nonEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = message.foreach(deadLetters.enqueue(owner, _))
}

final case class SingleMessageMailbox() extends MailboxType with ProducesMessageQueue[SingleMessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new SingleMessageQueue
}

and next enable it for your actor as descrived in the mailbox section of the docs

Split actors

You can introduce pair of actors.

  • Manager, receiving a job, resending it to Worker whenever it's not working right now
  • Worker doing actual work and notifing it's manager when it's done

example:

import akka.actor.{Actor, ActorRef, Props}

object Worker {
  case class Job()
  case object JobDone
}

import Worker.{Job, JobDone}

class Worker extends Actor {
  override def receive = {
    case Job() ⇒
      // your long job
      context.parent ! JobDone
  }
}

class Manager extends Actor {
  var nextJob = Option.empty[(Job, ActorRef)]
  val worker = context.actorOf(Props[Worker])

  def working: Receive = {
    case job: Job ⇒ nextJob = Some((job, sender))
    case JobDone ⇒
      nextJob match {
        case Some((job, snd)) ⇒ worker.tell(job, snd)
        case None ⇒ context.become(free)
      }
  }

  def free: Receive = {
    case job: Job ⇒
      worker.tell(job, sender)
      context.become(working)
  }

  override def receive = free
}

Upvotes: 1

Related Questions