Reputation: 422
I have a use case where I want to create a bunch of GroupedFlux by a PartitionKey and within each group delay elements by 100 milliseconds. However, I want multiple groups to start at the same time. So if there are 3 groups, I expect 3 messages emitted every 100 millisecond. However, with the following code I see only 1 message every 100 milliseconds.
This is the code that I was expecting to work.
final Flux<GroupedFlux<String, TData>> groupedFlux =
flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
.flatMap(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.subscribe());
This is the log.
21:24:29.318 parallel-5] : GroupByKey : 2
21:24:29.424 parallel-6] : GroupByKey : 3
21:24:29.529 parallel-7] : GroupByKey : 1
21:24:29.634 parallel-8] : GroupByKey : 2
21:24:29.739 parallel-9] : GroupByKey : 3
21:24:29.844 parallel-10] : GroupByKey : 1
21:24:29.953 parallel-11] : GroupByKey : 2
21:24:30.059 parallel-12] : GroupByKey : 3
21:24:30.167 parallel-1] : GroupByKey : 1
(See almost 100 ms difference between each log statement. 1s column is the timestamp.
Upvotes: 1
Views: 356
Reputation: 422
Upon more analysis I found out that it was working fine. My tests had incorrect data for PartitionKey which was resulting in a single GroupedFlux.
Answering my own question in case someone ever doubts that the delayElements works differently on a groupedFlux. It does not.
Upvotes: 2