Tobi
Tobi

Reputation: 2040

How can I aggregate elements on a flux by group / how to reduce groupwise?

Assume you have a flux of objects with the following structure:

class Element {
  String key;
  int count;
}

Now imagine those elements flow in a predefined sort order, always in groups of a key, like

{ key = "firstKey",  count=123}
{ key = "firstKey",  count=1  }
{ key = "secondKey", count=4  }
{ key = "thirdKey",  count=98 }
{ key = "thirdKey",  count=5  }
 .....

What I want to do is create a flux which returns one element for each distinct key and summed count for each key-group. So basically like a classic reduce for each group, but using the reduce operator does not work, because it only returns a single element and I want to get a flux with one element for each distinct key.

Using bufferUntil might work, but has the drawback, that I have to keep a state to check if the key has changed in comparison to the previous one.

Using groupBy is an overkill, as I know that each group has come to an end once a new key is found, so I don't want to keep anything cached after that event.

Is such an aggregation possible using Flux, without keeping a state outside of the flow?

Upvotes: 4

Views: 2852

Answers (2)

frochi42
frochi42

Reputation: 81

That really worked for me! Thanks for that post. Please note that in the meantime the "compose" method was renamed. You need to use transformDeferred instead. In my case I have a "Dashboard" object which has an id (stored as UUID) on which I want to group the source flux:

Flux<Dashboard> sourceFlux = ... // could be a DB query. The Flux must be sorted according the id.  

sourceFlux.transformDeferred(dashboardFlux -> {
    // this stores the dashboardId's as the Flux publishes. It is used to decide when to open a new window
    // having this state inside a compose means it will not be shared by multiple subscribers
    AtomicReference<UUID> last = new AtomicReference<>(null);
    
    return dashboardFlux
    //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
        .windowUntil(i -> !i.getDashboardId().equals(last.getAndSet(i.getDashboardId())), true)
        //reduce each window
        .flatMap(window -> window.reduce(... /* reduce one window here */));
})

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

This is currently (as of 3.2.5) not possible without keeping track of state yourself. distinctUntilChanged could have fit the bill with minimal state but doesn't emit the state, just the values it considered as "distinct" according to said state.

The most minimalistic way of solving this is with windowUntil and compose + an AtomicReference for state-per-subscriber:

Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
    //having this state inside a compose means it will not be shared by multiple subscribers
    AtomicReference<T> last = new AtomicReference<>(null);

    return source
      //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
      .windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
      //reduce each window
      .flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});

Upvotes: 2

Related Questions