Reputation: 3657
Using reactor.core.publisher.Flux
, how can I take n number of values from a Flux
and then wait for a duration before taking the next batch?
onboardService
.loadRepositories(user) // Flux of values
.take(30) // Take 30 from the flux
.delayElements(Duration.ofMinutes(1)) // Wait one minute
.doOnEach(...) // Process the value
.??? // How to repeat with the next 30?
Further in the chain, I'm pushing each value from the Flux
to a service that has a rate limit of 30 every minute.
onboardService
.loadRepositories(user)
.limitRate(10)
.delayElements(Duration.ofSeconds(10))
Sounds like what I want, however it doesn't behave how I'd expect. Using it with these parameters it waits 10 seconds between processing each individual flux, whereas I'd expect it to process 10, then take another 10.
Is there a better way to not overload the end service?
Upvotes: 0
Views: 1144
Reputation: 6255
You can window the flux and limit each window to one minute:
onboardService.loadRepositories(user)
.transform(flux ->
flux
.window(30)
.zipWith(Flux.interval(Duration.ZERO, Duration.ofMinutes(1)))
.flatMap(Tuple2::getT1))
.doOnEach(...) // Process the value
Upvotes: 2