frunza
frunza

Reputation: 287

Wait for a Scala Future to complete and continue with the next one

I have a list of 500,000 elements and a queue with 20 consumers. The messages are processed at different speeds (1, 15, 30, 60 seconds; 3, 50 min; 3, 16 hours, or more. 24 hours is the timeout). I need the consumer's response in order to do some processing over the data. I am going to use Scala Future for this and event-based onComplete.

In order not to flood the queue, I want to send first 30 messages to the queue: 20 will be picked by the consumers and 10 will be waiting in the queue. When one of the Futures is complete, I want to send another message to the queue. Can you give me an idea how to achieve this? Can this be done with Akka Streams?

This is wrong, I just want to give you an idea of what I want:

private def sendMessage(ids: List[String]): Unit = {
  val id = ids.head

  val futureResult = Future {
    //send id among some message to the queue
  }.map { result =>
    //process the response
  }

  futureResult.onComplete { _ =>
    sendMessage(ids.tail)
  }
}

def migrateAll(): Unit = {
  val ids: List[String] = //get IDs from the DB

  sendMessage(ids)
}

Upvotes: 3

Views: 3830

Answers (2)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Below is a simple example with Akka Streams that models your use case.

Let's define the processing as a method that takes a String and returns a Future[String]:

def process(id: String): Future[String] = ???

Then we create a Source from a List of 500,000 String elements and use mapAsync to feed the elements to the processing method. The level of parallelism is set to 20, meaning that no more than 20 Futures will be running at any point in time. As each Future is completed, we perform additional processing and print the result:

Source((1 to 500000).map(_.toString).toList)
  .mapAsync(parallelism = 20)(process)
  // do something with the result of the Future; here we create a new string
  //   that begins with "Processed: "
  .map(s => s"Processed: $s")
  .runForeach(println)

You can read more about mapAsync in the documentation.

Upvotes: 2

simpadjo
simpadjo

Reputation: 4017

That is a code I used for such tasks

class RateLimiter(semaphore: Semaphore) {
  def runBlocking[T](action: => Future[T]): Future[T] = {
    semaphore.acquire()
    val started = try {
      action
    }
    catch {
      case NonFatal(th) => {
        semaphore.release()
        throw th
      }
    }

    started.andThen {
      case _ => semaphore.release()
    }(ExecutionContext.Implicits.global)
  }
}

val rateLimiter = new RateLimiter(new Semaphore(20))
val tasks = (1 to 100)
val futures: Seq[Future[Int]] = tasks.map(i => rateLimiter.runBlocking(Future{
    i * 2
  }(ExecutionContext.Implicits.global)))
futures.foreach(f => Await.result(f, Duration.Inf))

It is not perfect since it blocks in 2 places (in the semaphore and in 'Await') and holds all futures in memory (it can be avoided).

But it works on production :)

Upvotes: 2

Related Questions