ab m
ab m

Reputation: 422

Reactor delayElements within a GroupedFlux delays elements across all groups

I have a use case where I want to create a bunch of GroupedFlux by a PartitionKey and within each group delay elements by 100 milliseconds. However, I want multiple groups to start at the same time. So if there are 3 groups, I expect 3 messages emitted every 100 millisecond. However, with the following code I see only 1 message every 100 milliseconds.

This is the code that I was expecting to work.

final Flux<GroupedFlux<String, TData>> groupedFlux =
        flux.groupBy(Event::getPartitionKey);
groupedFlux.subscribe(g -> g.delayElements(Duration.ofMillis(100))
        .flatMap(this::doWork)
        .doOnError(throwable -> log.error("error: ", throwable))
        .onErrorResume(e -> Mono.empty())
        .subscribe());

This is the log.

21:24:29.318   parallel-5]  : GroupByKey : 2
21:24:29.424   parallel-6]  : GroupByKey : 3
21:24:29.529   parallel-7]  : GroupByKey : 1
21:24:29.634   parallel-8]  : GroupByKey : 2
21:24:29.739   parallel-9]  : GroupByKey : 3
21:24:29.844  parallel-10]  : GroupByKey : 1
21:24:29.953  parallel-11]  : GroupByKey : 2
21:24:30.059  parallel-12]  : GroupByKey : 3
21:24:30.167   parallel-1]  : GroupByKey : 1

(See almost 100 ms difference between each log statement. 1s column is the timestamp.

Upvotes: 1

Views: 356

Answers (1)

ab m
ab m

Reputation: 422

Upon more analysis I found out that it was working fine. My tests had incorrect data for PartitionKey which was resulting in a single GroupedFlux.

Answering my own question in case someone ever doubts that the delayElements works differently on a groupedFlux. It does not.

Upvotes: 2

Related Questions