Reputation: 973
Is it possible to change the Reactor Flux flatMap concurrency during runtime?
The flatMap API allows us to configure the concurrency during the assembly time.
I am using flatMap to subscribe to a WebClient, the WebClient does a GET of REST call to external service. During runtime, I may get into issue such as Rate Limits, Service Not available, Internal server errors, In that case I would like to set the concurrency as low as 1 to reduce the amount of in processing elements. When things are back to normal, I would like to bring the concurrency to optimal level again.
Upvotes: 3
Views: 660
Reputation: 9
You could use a static content api where you get the variable for the concurrency and set it to the "concurrency" value in the flatMap as the flatMap documentation indicates:
"...
You can do:
flatMap(a-> {}, 1)
Where:
flatMap(a-> function: {}, concurrency: 1)
You can also set different concurrencies as variables:
flatMap(a-> {}, concurrency != null ? concurrency : Integer.MAX_VALUE)
By default if the concurrency variable is not added, it overloads and sets the default concurrency variable as Integer.MAX_VALUE.
I prefer using a Boolean and only choose if concurrent or not:
flatMap(a-> {}, isConcurrent ? Integer.MAX_VALUE : 1)
Details from Reactor Documentation:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
There are three dimensions to this operator that can be compared with flatMapSequential and concatMap:
Generation of inners and subscription: this operator is eagerly subscribing to its inners.
Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream.
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
flatMap with concurrency variable
Discard Support: This operator discards elements internally queued for backpressure upon cancellation or error triggered by a data signal.
Error Mode Support: This operator supports resuming on errors in the mapper Function. Exceptions thrown by the mapper then behave as if it had mapped the value to an empty publisher. If the mapper does map to a scalar publisher (an optimization in which the value can be resolved immediately without subscribing to the publisher, e.g. a Mono.fromCallable(Callable)) but said publisher throws, this can be resumed from in the same manner.
Type Parameters:
V - the merged output sequence type
Parameters:
mapper - the Function to transform input sequence into N sequences Publisher
concurrency - the maximum number of in-flight inner sequences
Returns:
a new Flux
..."
So to expand more on the answer as I got a downvote without any detail on why, this is the only detailed way you might do it:
Upvotes: -1