Guillaume B.
Guillaume B.

Reputation: 19

Akka scheduler : strange behavior in production (messages not firing)

I'm developing an scala + akka app as part as a bigger application. The purpose of the app is to call external services and SQL databases (using JDBC), do some processing, and return a parsed result, on a recurrent basis. The app uses akka cluster so that it can scale horizontally.

How it should work

I'm creating a **singleton actor* on the cluster who's responsible for sending instructions to a pool of instruction handlers actors. I'm receiving events from a Redis pub/sub channel that state which datasources should be refreshed and how often. This SourceScheduler actor stores in an internal Array the instruction along with the interval.

Then I'm using akka Scheduler to execute a tick function every second. This function filters the array to determine which instructions need to be executed, and sends messages to the instructions handlers pool. The routees in the pool execute the instructions and emit the results through Redis Pub/Sub

The issue

On my machine (Ryzen 7 + 16GB RAM + ArchLinux) everything runs fine and we're processing easily 2500 database calls/second. But once in production, I cannot get it to process more than ~400 requests/s.

The SourceScheduler doesn't tick every second, and messages get stuck in the mailbox. Also, the app uses more CPU resources, and way more RAM (1.3GB in production vs ~350MB on my machine)

The production app runs in a JRE-8 alpine-based Docker container on Rancher, on a MS Azure server.

I understand that singleton actors on clusters can be a bottleneck, but since it only forwards messages to other actors I don't see how it could block.

What I've tried

As anyone ever experienced such differences between their local machine and the production server ?


EDIT SourceScheduler.scala

class SourceScheduler extends Actor with ActorLogging with Timers {
  case object Tick
  case object SchedulerReport
  import context.dispatcher

  val instructionHandlerPool = context.actorOf(
    ClusterRouterGroup(
      RoundRobinGroup(Nil),
      ClusterRouterGroupSettings(
        totalInstances = 10,
        routeesPaths = List("/user/instructionHandler"),
        allowLocalRoutees = true
      )
    ).props(),
    name = "instructionHandlerRouter")

  var ticks: Int = 0
  var refreshedSources: Int = 0
  val maxTicks: Int = Int.MaxValue - 1

  var scheduledSources = Array[(String, Int, String)]()

  override def preStart(): Unit = {
    log.info("Starting Scheduler")
  }

  def refreshSource(hash: String) = {
    instructionHandlerPool ! Instruction(hash)
    refreshedSources += 1
  }

  // Get sources that neeed to be refreshed
  def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
    sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
  }

  def tick(): Unit = {
    ticks += 1
    log.debug("Scheduler TICK {}", ticks)
    val eligibleSources = getEligibleSources(scheduledSources, ticks)
    val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
    log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
    chunks.foreach({
      case(sources, index) =>
        after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
          sources.foreach(refreshSource)
        })
    })
    if(ticks >= maxTicks) ticks = 0
  }
  timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
  timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)

  def receive: Receive = {
    case AttachSource(hash, interval, socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources :+ ((hash, interval, socketId))
      }
    case DetachSource(socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources.filterNot(_._3 == socketId)
      }
    case SchedulerReport =>
      log.info("{} sources were scheduled since last report", refreshedSources)
      refreshedSources = 0
    case Tick => tick()
    case _ =>
  }
}

Each source has is determined by a hash containing all required data for the execution (like the host of the database for example), the refresh interval, and the unique id of the client that asked for it so we can stop refreshing when the client disconnects. Each second, we check if the source needs to be refreshed by applying a modulo with the current value of the ticks counter. We refresh sources in smaller chunks to avoid connection pool starvation The problem is that under a small load (~300 rq/s) the tick function is no longer executed every second

Upvotes: 0

Views: 418

Answers (2)

Guillaume B.
Guillaume B.

Reputation: 19

It turns out the issue was with Rancher. We did several tests and the app was running fine on the machine directly, and on docker, but not when using Rancher as the orchestrator. I'm not sure why but since it's not related to Akka I'm closing the issue. Thanks everyone for your help.

Upvotes: 1

Maybe the bottleneck is on the network latency? In your machine all components are running side by side and communication should have no latency but in the cluster, if you are making a high number of database calls from one machine to another the network latency may be noticeable.

Upvotes: -1

Related Questions