expert
expert

Reputation: 30095

How do you implement throttling of calls to method that supports async callback

Let's say we have method that returns either Future[T] or java CompletableFuture[T] or custom AsyncCompletionHandler[T] from org.asynchttpclient. I want to throttle all calls to such method.

How would you do that ? Currently I use MergeHub.source-based Sink to funnel all requests through it. Question I have

  1. Is there better way ?
  2. In my log output I see that time spent on all requests is less than I expect. Why ?

Here is the code

import java.time.ZonedDateTime

import akka.actor.ActorSystem
import akka.stream.scaladsl.{MergeHub, Sink, Source}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode}
import org.asynchttpclient.{DefaultAsyncHttpClient, _}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

object Main {

  private implicit val system = ActorSystem("root")
  private implicit val executor = system.dispatcher
  private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))

  type PendingRequest = () => Future[Try[Response]]

  private val throttlingSink =
    MergeHub.source[PendingRequest]
      .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping)
      .mapAsync(4)(_.apply())
      .to(Sink.ignore)
      .run()

  def wrap(p: Promise[Try[Response]]): AsyncCompletionHandler[Response] = new AsyncCompletionHandler[Response] {
    override def onThrowable(t: Throwable): Unit =
      p.success(Failure(t))

    override def onCompleted(response: Response): Response = {
      p.success(Success(response))
      response
    }
  }

  def makeRequest(url: String): Future[Response] = {

    val p = Promise[Try[Response]]

    Source.single[PendingRequest](() => {
      asyncHttpClient
        .prepareGet(url)
        .execute(wrap(p))

      p.future
    })
      .runWith(throttlingSink)

    p.future.flatMap {
      case Success(r) => Future.successful(r)
      case Failure(ex) => Future.failed(ex)
    }
  }

  val asyncHttpClient = new DefaultAsyncHttpClient()

  def main(args: Array[String]): Unit = {

    val start = ZonedDateTime.now()
    println("Start!")
    Source(1 to 20)
      .mapAsync(4) { index =>
        println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index")
        makeRequest(s"https://httpbin.org/get?param=$index").map { r =>
          println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}")
        }
      }
      .runWith(Sink.ignore)
      .onComplete {
        case Success(_) =>
          println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!")
          asyncHttpClient.close()
          system.terminate()
        case Failure(ex) =>
          ex.printStackTrace()
          asyncHttpClient.close()
          system.terminate()
      }

    Await.result(system.whenTerminated, Duration.Inf)
  }
}

In other words there are multiple places like content of main. And all of them should be throttles as sum of calls.

Upvotes: 0

Views: 586

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

As a general remark, you could probably do without the MergeHub step and streamline your pipeline. See example below

object Main {

  private implicit val system = ActorSystem("root")
  private implicit val executor = system.dispatcher
  private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))

  def makeRequest(url: String): Future[Response] = {
    val promise = Promise[Response]()
    asyncHttpClient.prepareGet(url).execute(new AsyncCompletionHandler[Response] {
      def onCompleted(response: Response) = {
        promise.success(response)
        response
      }
      override def onThrowable(t: Throwable) {
        promise.failure(t)
        super.onThrowable(t)
      }
    })
    promise.future
  }

  val asyncHttpClient = new DefaultAsyncHttpClient()

  def main(args: Array[String]): Unit = {

    val start = ZonedDateTime.now()
    println("Start!")
    Source(1 to 20)
      .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping)
      .mapAsync(4) { index =>
        println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index")
        makeRequest(s"http://httpbin.org/get?param=$index").map { r =>
          println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}")
        }
      }
      .runWith(Sink.ignore)
      .onComplete {
        case Success(_) =>
          println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!")
          asyncHttpClient.close()
          system.terminate()
        case Failure(ex) =>
          ex.printStackTrace()
          asyncHttpClient.close()
          system.terminate()
      }

    Await.result(system.whenTerminated, Duration.Inf)
  }
}

However, in both implementations I see requests throttled correctly - one every 2 seconds, roughly starting from second ~0 to second ~38.

Could you elaborate on what are you expectations here?

Upvotes: 1

Related Questions