eguneys
eguneys

Reputation: 6396

How to throttle Futures with one-second delay with Akka

I have list of URIs, each of which I want to request with a one-second delay in between. How can I do that?

val uris: List[String] = List()
// How to make these URIs resolve 1 second apart?
val responses: List[Future[Response]] = uris.map(httpRequest(_))

Upvotes: 3

Views: 613

Answers (3)

Serhii Shynkarenko
Serhii Shynkarenko

Reputation: 726

akka streams has it out of the box with the throttle function (taking into account that you are using akka-http and added tag for akka streams)

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19507

You could create an Akka Streams Source from the list of URIs, then throttle the conversion of each URI to a Future[Response]:

def httpRequest(uri: String): Future[Response] = ???

val uris: List[String] = ???

val responses: Future[Seq[Response]] =
  Source(uris)
    .throttle(1, 1 second)
    .mapAsync(parallelism = 1)(httpRequest)
    .runWith(Sink.seq[Response])

Upvotes: 6

Dima
Dima

Reputation: 40500

Something like this perahps:

  @tailrec
  def withDelay(
    uris: Seq[String], 
    delay: Duration = 1 second, 
    result: List[Future[Response]] = Nil,
  ): Seq[Future[Response]] = uris match {
     case Seq() => result.reversed
     case (head, tail@_*) => 
        val v = result.headOption.getOrElse(Future.successful(null))
          .flatMap { _ => 
            akka.pattern.after(delay, context.system.scheduler)(httpRequest(head))
          }
        withDelay(tail, delay, v :: result)
   }

this has a delay before the first execution as well, but I hope, it's clear enough how to get rid of it if necessary ... Another caveat is that this assumes that all futures succeed. As soon as one fails, all subsequent processing is aborted. If you need a different behavior, you may want to replace the .flatMap with .transform or add a .recover etc.

You can also write the same with .foldLeft if preferred:

  uris.foldLeft(List.empty[Future[Response]]) { case (results, next) => 
    results.headOption.getOrElse(Future.successful(null))
      .flatMap { _ => 
        akka.pattern.after(delay, context.system.scheduler)(httpRequest(next))
      } :: results
  }.reversed

Upvotes: 2

Related Questions