Reputation: 287
I have a list of 500,000 elements and a queue with 20 consumers. The messages are processed at different speeds (1, 15, 30, 60 seconds; 3, 50 min; 3, 16 hours, or more. 24 hours is the timeout). I need the consumer's response in order to do some processing over the data. I am going to use Scala Future
for this and event-based onComplete
.
In order not to flood the queue, I want to send first 30 messages to the queue: 20 will be picked by the consumers and 10 will be waiting in the queue. When one of the Future
s is complete, I want to send another message to the queue. Can you give me an idea how to achieve this? Can this be done with Akka Streams?
This is wrong, I just want to give you an idea of what I want:
private def sendMessage(ids: List[String]): Unit = {
val id = ids.head
val futureResult = Future {
//send id among some message to the queue
}.map { result =>
//process the response
}
futureResult.onComplete { _ =>
sendMessage(ids.tail)
}
}
def migrateAll(): Unit = {
val ids: List[String] = //get IDs from the DB
sendMessage(ids)
}
Upvotes: 3
Views: 3830
Reputation: 19527
Below is a simple example with Akka Streams that models your use case.
Let's define the processing as a method that takes a String
and returns a Future[String]
:
def process(id: String): Future[String] = ???
Then we create a Source
from a List
of 500,000 String
elements and use mapAsync
to feed the elements to the processing method. The level of parallelism is set to 20, meaning that no more than 20 Future
s will be running at any point in time. As each Future
is completed, we perform additional processing and print the result:
Source((1 to 500000).map(_.toString).toList)
.mapAsync(parallelism = 20)(process)
// do something with the result of the Future; here we create a new string
// that begins with "Processed: "
.map(s => s"Processed: $s")
.runForeach(println)
You can read more about mapAsync
in the documentation.
Upvotes: 2
Reputation: 4017
That is a code I used for such tasks
class RateLimiter(semaphore: Semaphore) {
def runBlocking[T](action: => Future[T]): Future[T] = {
semaphore.acquire()
val started = try {
action
}
catch {
case NonFatal(th) => {
semaphore.release()
throw th
}
}
started.andThen {
case _ => semaphore.release()
}(ExecutionContext.Implicits.global)
}
}
val rateLimiter = new RateLimiter(new Semaphore(20))
val tasks = (1 to 100)
val futures: Seq[Future[Int]] = tasks.map(i => rateLimiter.runBlocking(Future{
i * 2
}(ExecutionContext.Implicits.global)))
futures.foreach(f => Await.result(f, Duration.Inf))
It is not perfect since it blocks in 2 places (in the semaphore and in 'Await') and holds all futures in memory (it can be avoided).
But it works on production :)
Upvotes: 2