smeeb
smeeb

Reputation: 29567

Non-Blocking Timer Tasks and Akka Actors

Please note: Although I'd prefer a solution using Akka's Java API (which is what I'm using), I'm happy with any working solution and can probably figure out how to translate Scala-based answers into Java-land.


I have an Akka app that has many actors, two of which are Fizz and Buzz. The Fizz actor can accept 2 types of messages:

The Buzz actor simply accepts a DoItNow message. The message flow between these actors and the rest of the system is as follows:

  1. Anything (other actors, even event-driven components outside the actor system) can send StartNewTimerTask messages to the Fizz actor at any time
  2. Each time the Fizz actor receives a StartNewTimerTask message, it creates and starts a new asynchronous/non-blocking timer that attempts to run for, say, 8 seconds. If the timer gets to the end (8 seconds) then a DoItNow message is sent to the Buzz actor
  3. The Fizz actor can accept any number of concurrent StartNewTimerTask messages, and as such, can be "managing" potentially multiple timers at the same time, with each one counting towards that 8-second magical number. Hence if 20 other actors send StartNewTimerTask messages to the Fizz actor all within a few seconds of each other, then the Fizz actor would be "managing" 20 non-blocking, independent timers at the same time. And when each of those 20 timers reaches their respective 8-second duration, they send 20 independent DoItNow messages to the Buzz actor
  4. When the Fizz actor receives a ResetAllTimerTasks message, any timers that are currently "in progress" will be interrupted/cancelled (so that they stop counting down to the 8-second duration, thus preventing them from sending a DoItNow message to Buzz). Hence, borrowing from our example above, if between times t=1 and t=3 the Fizz actor received 20 StartNewTimerTask messages, then at t=10 perhaps 14 of their respective timers would have elapsed and fired DoItNow messages, and perhaps 6 would still be in progress. If at that exact moment Fizz received a ResetAllTimerTasks message, it would stop those 6 timers from elapsing and firing messages, and so in this example Buzz would only receive 14 DoItNow messages

I know that the Java 8 API (sans Akka) advocates extending TimerTask and submitting these tasks to the Timer#scheduleAtFixedRate method, but I'm not sure if that conflicts with Akka at all or if there is a better way to implement this functionality with the Akka API. My best attempt thus far:

// Groovy pseudo-code
class MyTimerTask extends TimerTask {
  @Inject
  ActorRef buzz

  @Override
  void run() {
    // No op!
  }

  void completeTask() {
    buzz.tell(new DoItNow(), null)
  }
}

class Fizz extends UntypedAbstractActor {
  @Inject
  Timer timer

  @Override
  void onReceive(Object message) {
    if(message in StartNewTimerTask) {
      timer.scheduleAtFixedRate(new MyTimerTask(), 0, 8 * 1000)
    } else if(message in ResetAllTimerTasks) {
      timer.cancel()
    }
  }
}

class Buzz extends UntypedAbstractActor {
  @Override
  void onReceive(Object message) {
    if(message in DoItNow) {
      // Do something super cool now...
    }
  }
}

However I don't think I'm managing timers correctly or tapping into the full potential of the Akka scheduler/timer API. Any thoughts?

Upvotes: 0

Views: 2493

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Consider eschewing the Java Timer API in favor of the new actor timer functionality that was just released with Akka 2.5.4. Actor timers allow an actor to schedule periodic messages to itself with one or more internal timers that are tied to its lifecycle. To access this feature in Java, simply change the Fizz actor to extend AbstractActorWithTimers.

The below example is in Scala (in Scala, mix in the Timers trait):

object Fizz {
  private case object SendToBuzz
}

class Fizz(buzz: ActorRef) extends Actor with Timers {
  import Fizz._

  def receive = {
    case StartNewTimerTask =>
      val uuid = java.util.UUID.randomUUID
      timers.startPeriodicTimer(uuid, SendToBuzz, 8.seconds)
    case ResetAllTimerTasks =>
      timers.cancelAll()
    case SendToBuzz =>
      buzz ! DoItNow
  }
}
  • When the Fizz actor processes a StartNewTimerTask message, it kicks off a new timer that will send a SendToBuzz message to self (that is, the Fizz actor) every eight seconds.
  • When Fizz processes a SendToBuzz message, it sends a DoItNow message to the Buzz actor.
  • When Fizz processes a ResetAllTimerTasks message, it cancels all the timers.
  • If Fizz is restarted or stopped, all of its timers are automatically canceled.

Upvotes: 1

Related Questions