Reputation: 30095
Let's say we have method that returns either Future[T]
or java CompletableFuture[T]
or custom AsyncCompletionHandler[T]
from org.asynchttpclient
. I want to throttle all calls to such method.
How would you do that ? Currently I use MergeHub.source
-based Sink
to funnel all requests through it. Question I have
Here is the code
import java.time.ZonedDateTime
import akka.actor.ActorSystem
import akka.stream.scaladsl.{MergeHub, Sink, Source}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode}
import org.asynchttpclient.{DefaultAsyncHttpClient, _}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
object Main {
private implicit val system = ActorSystem("root")
private implicit val executor = system.dispatcher
private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
type PendingRequest = () => Future[Try[Response]]
private val throttlingSink =
MergeHub.source[PendingRequest]
.throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping)
.mapAsync(4)(_.apply())
.to(Sink.ignore)
.run()
def wrap(p: Promise[Try[Response]]): AsyncCompletionHandler[Response] = new AsyncCompletionHandler[Response] {
override def onThrowable(t: Throwable): Unit =
p.success(Failure(t))
override def onCompleted(response: Response): Response = {
p.success(Success(response))
response
}
}
def makeRequest(url: String): Future[Response] = {
val p = Promise[Try[Response]]
Source.single[PendingRequest](() => {
asyncHttpClient
.prepareGet(url)
.execute(wrap(p))
p.future
})
.runWith(throttlingSink)
p.future.flatMap {
case Success(r) => Future.successful(r)
case Failure(ex) => Future.failed(ex)
}
}
val asyncHttpClient = new DefaultAsyncHttpClient()
def main(args: Array[String]): Unit = {
val start = ZonedDateTime.now()
println("Start!")
Source(1 to 20)
.mapAsync(4) { index =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index")
makeRequest(s"https://httpbin.org/get?param=$index").map { r =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}")
}
}
.runWith(Sink.ignore)
.onComplete {
case Success(_) =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!")
asyncHttpClient.close()
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
asyncHttpClient.close()
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
}
In other words there are multiple places like content of main. And all of them should be throttles as sum of calls.
Upvotes: 0
Views: 586
Reputation: 9023
As a general remark, you could probably do without the MergeHub
step and streamline your pipeline. See example below
object Main {
private implicit val system = ActorSystem("root")
private implicit val executor = system.dispatcher
private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
def makeRequest(url: String): Future[Response] = {
val promise = Promise[Response]()
asyncHttpClient.prepareGet(url).execute(new AsyncCompletionHandler[Response] {
def onCompleted(response: Response) = {
promise.success(response)
response
}
override def onThrowable(t: Throwable) {
promise.failure(t)
super.onThrowable(t)
}
})
promise.future
}
val asyncHttpClient = new DefaultAsyncHttpClient()
def main(args: Array[String]): Unit = {
val start = ZonedDateTime.now()
println("Start!")
Source(1 to 20)
.throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping)
.mapAsync(4) { index =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index")
makeRequest(s"http://httpbin.org/get?param=$index").map { r =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}")
}
}
.runWith(Sink.ignore)
.onComplete {
case Success(_) =>
println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!")
asyncHttpClient.close()
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
asyncHttpClient.close()
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
}
However, in both implementations I see requests throttled correctly - one every 2 seconds, roughly starting from second ~0 to second ~38.
Could you elaborate on what are you expectations here?
Upvotes: 1