seti
seti

Reputation: 341

Reactor Flux subscriber stream stopped when using reduce on flatMap

I want change my code for single subscriber. Now i have

auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
        s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));

This code is working correctly reduce method is very simple. I tried change my code for single subscriber

    auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
        .flatMap(window -> window.groupBy(Auction::getItem))
        .flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
        .subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));

This is my code, and this code is not working. No errors and no results. After debugging i found code is stuck on second flatMap when i reducing stream. I think problem is on flatMap merging, stucking on Mono resolve. Some one now how to fix this problem and use only single subscriber?

How to replicate, you can use another class or create one. In small size is working but on bigger is dying

List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
    Auction a = new Auction((long) i, "test");
    a.setItem((long) (i%50));
    auctionList.add(a);
}

Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
        longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));

On this approach is instant result but I using 3 subscribers

Flux.fromIterable(auctionList)
        .groupBy(Auction::getId)
        .subscribe(
                auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
                )
        ));

Upvotes: 0

Views: 2038

Answers (2)

Felipe Moraes
Felipe Moraes

Reputation: 218

I think the behavior you described is related to the interaction between groupBy chained with flatMap. Check groupBy documentation. It states that:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

By default, maxConcurrency (flatMap) is set to 256 (i checked the source code of 3.2.2). So, selecting more than 256 groups may cause the execution to hang (particularly when all execution happens on the same thread).

The following code helps in understanding what happens when you chain the operators groupBy and flatMap:

@Test
public void groupAndFlatmapTest() {
    val groupCount = 257;
    val groupSize = 513;
    val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
    val source = Flux.fromIterable(list)
            .groupBy(i -> i % groupCount)
            .flatMap(Flux::collectList);
    StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}

The execution of this code hangs. Changing groupCount to 256 or less makes the test pass (for every value of groupSize).

So, regarding your original problem, it is very possible that you are creating a large amount of groups with your key-selector Auction::getItem.

Upvotes: 2

seti
seti

Reputation: 341

Adding parallel fixed problem, but i looking answer why reduce dramatically slow flatMap.

Upvotes: 0

Related Questions