Chris
Chris

Reputation: 3657

Reactor core, take 30 then wait before taking another 30

Using reactor.core.publisher.Flux, how can I take n number of values from a Flux and then wait for a duration before taking the next batch?

onboardService
  .loadRepositories(user)               // Flux of values
  .take(30)                             // Take 30 from the flux
  .delayElements(Duration.ofMinutes(1)) // Wait one minute
  .doOnEach(...)                        // Process the value
  .???                                  // How to repeat with the next 30?

Further in the chain, I'm pushing each value from the Flux to a service that has a rate limit of 30 every minute.

onboardService
  .loadRepositories(user)   
  .limitRate(10)
  .delayElements(Duration.ofSeconds(10))

Sounds like what I want, however it doesn't behave how I'd expect. Using it with these parameters it waits 10 seconds between processing each individual flux, whereas I'd expect it to process 10, then take another 10.

Is there a better way to not overload the end service?

Upvotes: 0

Views: 1144

Answers (1)

lkatiforis
lkatiforis

Reputation: 6255

You can window the flux and limit each window to one minute:

onboardService.loadRepositories(user)
  .transform(flux ->
               flux
                 .window(30)
                 .zipWith(Flux.interval(Duration.ZERO, Duration.ofMinutes(1)))
                 .flatMap(Tuple2::getT1))
  .doOnEach(...) // Process the value

Upvotes: 2

Related Questions