Reputation: 5327
Is there any way to change the Source's throttling parameters dynamically so as to implement a ramp up mechanism e.g every passing second the throttling is to be reduced/increased ?
Upvotes: 4
Views: 872
Reputation: 41
Akka throttle function offers calculating throttle based on cost. We can use it to dynamically change throttling rate.
Example:
private val weight = new AtomicInteger(1)
Source(1 to 1000)
.throttle(1000, 1.second, _ => 1000 / weight.get())
.runWith(Sink.foreach(println))
while (true) {
val newWeight = StdIn.readLine().toInt
weight.set(newWeight)
}
Initial throttling rate is 1/s. Then we can change it by changing weight. I used factor of 1000 due to costFunction returning integer. It can be increased for better granularity.
If you want to change throttling based on passing time you just need to change weight as the time passes.
Upvotes: 4