Reputation: 5280
I am implementing some generic scheduling in an Actor with a need to manage a set of ids for each item scheduled. So I came up with the following trait to implement it:
abstract trait SchedulingActor[I <: SchedulingActor.Item[I]] extends Actor {
private val log = LoggerFactory.getLogger(classOf[SchedulingActor[I]])
private var schedule = Map[Ticket, (I, Cancellable)]()
private var ticketMap = Map[Int, Ticket]()
protected def delay(msg: Any, item: I): Cancellable
protected def initiate(item: I): Unit
protected def schedule(item: I) = self ! item
private def currentTickets(item: I) = item.ids.flatMap(ticketMap.get(_))
protected def linkedItems(ids: Seq[Int]) = ids.flatMap(ticketMap.get(_)).map(schedule(_)._1)
protected case class Ticket() {
def collect(item: I) = {
schedule += this -> (item, delay(this, item))
ticketMap ++= item.ids.map(_ -> this)
}
def update(item: I) = {
schedule += this -> (item, schedule(this)._2)
}
def cancel() = {
schedule(this)._1.cancel()
schedule(this)._2.cancel()
drop()
}
def drop() = {
ticketMap = ticketMap.filterKeys(!schedule(this)._1.ids.contains(_))
schedule -= this
}
def item = schedule(this)._1
}
def receive = {
case item: I =>
// Check if we're overwriting any currently scheduled
try {
log.debug("Scheduling item: " + item)
log.debug("Checking for tickets: " + ticketMap)
}
catch {
case ex: Throwable =>
log.error("Strange error", ex)
}
currentTickets(item).foreach { ticket =>
log.debug("Checking current ticket: " + ticket)
ticket.item.purge(item) match {
case upd if upd.ids.nonEmpty =>
log.debug("Updating schedule: " + upd)
ticket.update(upd)
case _ =>
log.debug("Cancelling ticket: " + ticket)
ticket.cancel()
}
}
// Schedule the replacement now
log.debug("Collecting ticket for: " + item)
Ticket().collect(item)
log.debug("Scheduled replacement: " + item)
case ticket: Ticket =>
initiate(ticket.item)
ticket.drop()
}
}
object SchedulingActor {
trait Item[T <: Item[T]] {
val ids: Set[Int]
def cancel(): Unit
def purge(newItem: T): T
}
}
All works fine apart from a bizarre exception I am getting when just debugging the "ticketMap" object in the line "Checking for tickets...". Sometimes I get the following exception:
java.util.NoSuchElementException: key not found: Ticket() at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:101) ~[scala-library-2.12.10.jar:na] at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:99) ~[scala-library-2.12.10.jar:na] at scalaflow.util.akka.SchedulingActor$Ticket.$anonfun$drop$1(SchedulingActor.scala:39) ~[classes/:na] at scala.runtime.java8.JFunction1$mcZI$sp.apply(JFunction1$mcZI$sp.java:23) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike$FilteredKeys.$anonfun$iterator$1(MapLike.scala:239) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike$FilteredKeys.$anonfun$iterator$1$adapted(MapLike.scala:239) ~[scala-library-2.12.10.jar:na] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) ~[scala-library-2.12.10.jar:na] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ~[scala-library-2.12.10.jar:na] at scala.collection.Iterator.foreach(Iterator.scala:941) ~[scala-library-2.12.10.jar:na] at scala.collection.Iterator.foreach$(Iterator.scala:941) ~[scala-library-2.12.10.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableOnce.addString(TraversableOnce.scala:362) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableOnce.addString$(TraversableOnce.scala:358) ~[scala-library-2.12.10.jar:na] at scala.collection.AbstractIterator.addString(Iterator.scala:1429) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike.addString(MapLike.scala:364) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike.addString$(MapLike.scala:363) ~[scala-library-2.12.10.jar:na] at scala.collection.AbstractMap.addString(Map.scala:63) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:328) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:327) ~[scala-library-2.12.10.jar:na] at scala.collection.AbstractTraversable.mkString(Traversable.scala:108) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableLike.toString(TraversableLike.scala:688) ~[scala-library-2.12.10.jar:na] at scala.collection.TraversableLike.toString$(TraversableLike.scala:688) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike.toString(MapLike.scala:373) ~[scala-library-2.12.10.jar:na] at scala.collection.MapLike.toString$(MapLike.scala:373) ~[scala-library-2.12.10.jar:na] at scala.collection.AbstractMap.toString(Map.scala:63) ~[scala-library-2.12.10.jar:na] at java.lang.String.valueOf(String.java:2994) ~[na:1.8.0_252] at java.lang.StringBuilder.append(StringBuilder.java:131) ~[na:1.8.0_252] at scalaflow.util.akka.SchedulingActor$$anonfun$receive$1.applyOrElse(SchedulingActor.scala:52) ~[classes/:na] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) [scala-library-2.12.10.jar:na] at opex.exchange.MarketExchange$$anonfun$receive$1.applyOrElse(MarketExchange.scala:130) [classes/:na] at akka.actor.Actor.aroundReceive(Actor.scala:517) [akka-actor_2.12-2.5.21.jar:2.5.21]
Why on earth would it be trying to call the "drop" method when converting the ticketMap to a String?
Upvotes: 0
Views: 101
Reputation: 5280
From examining the stack trace I was able to infer that the problem stems from the method drop(), specifically the line:
ticketMap = ticketMap.filterKeys(!schedule(this)._1.ids.contains(_))
The problem was resolved by changing this to:
ticketMap = ticketMap.filterNot(kv => schedule(this)._1.ids.contains(kv._1))
I am assuming the implementation of filterKeys uses a similar mechanism to mapKeys which uses lazy evaluation. So the filterKeys is not called until the next time ticketMap is referenced which is well after the message has been handled. This is obviously not the desired behaviour.
I think this feature of scala is a very dangerous pitfall when used in conjunction with Akka
Upvotes: 1