Reputation: 39
I am trying to do a very simple thing and want to understand the right way of doing it. I need to periodically make some Rest API calls to a separate service and then process the results asynchronously. I am using actor system's default scheduler to schedule the Http requests and have created a separate threadpool to handle the Future callbacks. Since there is no dependency between requests and response I thought a separate threadpool for handling future callbacks should be fine.
Is there some problem with this approach?
I read the Scala doc and it says there is some issue here (though i not clear on it).
Generally what is recommended way of handling these scenarios?
implicit val system = ActorSystem("my-actor-system") // define an actor system
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) // create a thread pool
// define a thread which periodically does some work using the actor system's scheduler
system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds)(new Runnable {
override def run(): Unit = {
val urls = getUrls() // get list of urls
val futureResults = urls.map(entry => getData[MyData](entry))) // get data foreach url
futureResults onComplete {
case Success(res) => // do something with the result
case Failure(e) => // do something with the error
}
}
}))
def getdata[T](url : String) : Future[Option[Future[T]] = {
implicit val ec1 = system.dispatcher
val responseFuture: Future[HttpResponse] = execute(url)
responseFuture map { result => {
// transform the response and return data in format T
}
}
}
Upvotes: 0
Views: 235
Reputation: 1940
Whether or not having a separate thread pool really depends on the use case. If the service integration is very critical and is designed to take a lot of resources, then a separate thread pool may make sense, otherwise, just use the default one should be fine. Feel free to refer to Levi's question for more in-depth discussions on this part.
Regarding "job scheduling in an actor system", I think Akka streams are a perfect fit here. I give you an example below. Feel free to refer to the blog post https://blog.colinbreck.com/rethinking-streaming-workloads-with-akka-streams-part-i/ regarding how many things can Akka streams simplify for you.
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object Timer {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Timer")
// default thread pool
implicit val ec: ExecutionContext = system.dispatcher
// comment out below if custom thread pool is needed
// also make sure you read https://doc.akka.io/docs/akka/current/dispatchers.html#setting-the-dispatcher-for-an-actor
// to define the custom thread pool
// implicit val ec: ExecutionContext = system.dispatchers.lookup("my-custom-dispatcher")
Source
.tick(5.seconds, 5.seconds, getUrls())
.mapConcat(identity)
.mapAsync(1)(url => fetch(url))
.runWith(Sink.seq)
.onComplete {
case Success(responses) =>
// handle responses
case Failure(ex) =>
// handle exceptions
}
}
def getUrls(): Seq[String] = ???
def fetch(url: String): Future[Response] = ???
case class Response(body: String)
}
Upvotes: 2
Reputation: 20611
In addition to Yik San Chan's answer above (especially regarding using Akka Streams), I'd also point out that what exactly you're doing in the .onComplete
block is quite relevant to the choice of which ExecutionContext
to use for the onComplete
callback.
In general, if what you're doing in the callback will be doing blocking I/O, it's probably best to do it in a threadpool which is large relative to the number of cores (note that each thread on the JVM consumes about 1MB or so of heap, so it's probably not a great idea to use an ExecutionContext
that spawns an unbounded number of threads; a fixed pool of about 10x your core count is probably OK).
Otherwise, it's probably OK to use an ExecutionContext
with a threadpool roughly equal in size to the number of cores: the default Akka dispatcher is such an ExecutionContext
. The only real reason to consider not using the Akka dispatcher, in my experience/opinion, is if the callback is going to occupy the CPU for a long time. The phenomenon known as "thread starvation" can occur in that scenario, with adverse impacts on performance and cluster stability (if using, e.g. Akka Cluster or health-checks). In such a scenario, I'd tend to use a dispatcher with fewer threads than cores and consider configuring the default dispatcher with fewer threads than the default (while the kernel's scheduler can and will manage more threads ready-to-run than cores, there are strong arguments for not letting it do so).
In an onComplete
callback (in comparison to the various transformation methods on Future
like map
/flatMap
and friends), since all you can do is side-effect, it's probably more likely than not that you're doing blocking I/O.
Upvotes: 1