pnndesh
pnndesh

Reputation: 39

Right way of handling multiple future callbacks using threadpool in Scala

I am trying to do a very simple thing and want to understand the right way of doing it. I need to periodically make some Rest API calls to a separate service and then process the results asynchronously. I am using actor system's default scheduler to schedule the Http requests and have created a separate threadpool to handle the Future callbacks. Since there is no dependency between requests and response I thought a separate threadpool for handling future callbacks should be fine.

Is there some problem with this approach?

I read the Scala doc and it says there is some issue here (though i not clear on it).

Generally what is recommended way of handling these scenarios?


implicit val system = ActorSystem("my-actor-system") // define an actor system

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) // create a thread pool

// define a thread which periodically does some work using the actor system's scheduler
system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds)(new Runnable {

      override def run(): Unit = {

          val urls = getUrls() // get list of urls
          
          val futureResults = urls.map(entry => getData[MyData](entry))) // get data foreach url
          
          futureResults onComplete {
            case Success(res) => // do something with the result
            case Failure(e) => // do something with the error
          }
        }
    }))

  def getdata[T](url : String) : Future[Option[Future[T]] = {

    implicit val ec1 = system.dispatcher

    val responseFuture: Future[HttpResponse] = execute(url)
    responseFuture map { result => {
           // transform the response and return data in format T
      }
    }
  }

Upvotes: 0

Views: 235

Answers (2)

yiksanchan
yiksanchan

Reputation: 1940

Whether or not having a separate thread pool really depends on the use case. If the service integration is very critical and is designed to take a lot of resources, then a separate thread pool may make sense, otherwise, just use the default one should be fine. Feel free to refer to Levi's question for more in-depth discussions on this part.

Regarding "job scheduling in an actor system", I think Akka streams are a perfect fit here. I give you an example below. Feel free to refer to the blog post https://blog.colinbreck.com/rethinking-streaming-workloads-with-akka-streams-part-i/ regarding how many things can Akka streams simplify for you.

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object Timer {

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("Timer")
    // default thread pool
    implicit val ec: ExecutionContext = system.dispatcher
    
    // comment out below if custom thread pool is needed
    // also make sure you read https://doc.akka.io/docs/akka/current/dispatchers.html#setting-the-dispatcher-for-an-actor
    // to define the custom thread pool
    // implicit val ec: ExecutionContext = system.dispatchers.lookup("my-custom-dispatcher")

    Source
      .tick(5.seconds, 5.seconds, getUrls())
      .mapConcat(identity)
      .mapAsync(1)(url => fetch(url))
      .runWith(Sink.seq)
      .onComplete {
        case Success(responses) =>
          // handle responses
        case Failure(ex)        =>
          // handle exceptions
      }
  }

  def getUrls(): Seq[String] = ???

  def fetch(url: String): Future[Response] = ???

  case class Response(body: String)
}

Upvotes: 2

Levi Ramsey
Levi Ramsey

Reputation: 20611

In addition to Yik San Chan's answer above (especially regarding using Akka Streams), I'd also point out that what exactly you're doing in the .onComplete block is quite relevant to the choice of which ExecutionContext to use for the onComplete callback.

In general, if what you're doing in the callback will be doing blocking I/O, it's probably best to do it in a threadpool which is large relative to the number of cores (note that each thread on the JVM consumes about 1MB or so of heap, so it's probably not a great idea to use an ExecutionContext that spawns an unbounded number of threads; a fixed pool of about 10x your core count is probably OK).

Otherwise, it's probably OK to use an ExecutionContext with a threadpool roughly equal in size to the number of cores: the default Akka dispatcher is such an ExecutionContext. The only real reason to consider not using the Akka dispatcher, in my experience/opinion, is if the callback is going to occupy the CPU for a long time. The phenomenon known as "thread starvation" can occur in that scenario, with adverse impacts on performance and cluster stability (if using, e.g. Akka Cluster or health-checks). In such a scenario, I'd tend to use a dispatcher with fewer threads than cores and consider configuring the default dispatcher with fewer threads than the default (while the kernel's scheduler can and will manage more threads ready-to-run than cores, there are strong arguments for not letting it do so).

In an onComplete callback (in comparison to the various transformation methods on Future like map/flatMap and friends), since all you can do is side-effect, it's probably more likely than not that you're doing blocking I/O.

Upvotes: 1

Related Questions