Kim Stebel
Kim Stebel

Reputation: 42047

scala actors: drop messages if queue is too long?

I would like to drop messages from an actor's mailbox if it becomes too full. For example, if the queue size reaches 1000 messages, the oldest one should be deleted.

Upvotes: 3

Views: 513

Answers (2)

Nikolay Botev
Nikolay Botev

Reputation: 1598

You can also reify an actor's queue on the heap and throttle its utilization by using a proxy actor. Then you can write something like the following:

// adder actor with a bounded queue size of 4
val adder = boundActor(4) {
  loop {
    react {
      case x: Int => reply(x*2)
    }
  }
}

// test the adder
actor {
  for (i <- 1 to 10) {
    adder !! (i, { case answer: Int => println("Computed " + i + " -> " + answer) })
  }
}

Here is the implementation of boundedActor. Note that a boundedActor must always reply to its sender, otherwise there is no way to track its queue size, and the boundedActor will freeze refusing to accept any further messages.

object ActorProxy extends scala.App {

  import scala.actors._
  import scala.actors.Actor._
  import scala.collection.mutable._

  /**
   * Accepts an actor and a message queue size, and 
   * returns a proxy that drops messages if the queue
   * size of the target actor exceeds the given queue size.
   */
  def boundActorQueue(target: Actor, maxQueueLength: Int) = actor {
    val queue = new Queue[Tuple2[Any, OutputChannel[Any]]]
    var lastMessageSender: Option[OutputChannel[Any]] = None

    def replyHandler(response: Any) {
      if (lastMessageSender.get != null) lastMessageSender.get ! response
      if (queue.isEmpty) {
        lastMessageSender = None
      } else {
        val (message, messageSender) = queue.dequeue
        forwardMessage(message, messageSender)
      }
    }

    def forwardMessage(message: Any, messageSender: OutputChannel[Any]) = {
      lastMessageSender = Some(messageSender)
      target !! (message, { case response => replyHandler(response) })
    }

    loop {
      react {
        case message =>
          if (lastMessageSender == None) {
            forwardMessage(message, sender)
          } else {
            queue.enqueue((message, sender))
            // Restrict the queue size
            if (queue.length > maxQueueLength) {
                val dropped = queue.dequeue
                println("!!!!!!!! Dropped message " + dropped._1)
            }
          }
      }
    }
  }

  // Helper method
  def boundActor(maxQueueLength: Int)(body: => Unit): Actor = boundActorQueue(actor(body), maxQueueLength)

}

Upvotes: 3

Vasil Remeniuk
Vasil Remeniuk

Reputation: 20617

You cannot work with the mailbox directly, but you can implement Message Expiration pattern on top of the existing library.

Send a creation date with every message:

case class ExpirableMessage(msg: String, createdAt: Long) 

Scan the mailbox with reactWithin(0), and filter out expired messages:

react{ 
  case msg: ExpirableMessage => 
    // handle the message
    // clean the mailbox with nested react
    reactWithin(0){
        case ExpirableMessage(_, createdAt) if(currentTimeMillis - createdAt > INTERVAL) =>
        case TIMEOUT =>
    }
}

Upvotes: 6

Related Questions