Reputation: 2040
Assume you have a flux of objects with the following structure:
class Element {
String key;
int count;
}
Now imagine those elements flow in a predefined sort order, always in groups of a key, like
{ key = "firstKey", count=123}
{ key = "firstKey", count=1 }
{ key = "secondKey", count=4 }
{ key = "thirdKey", count=98 }
{ key = "thirdKey", count=5 }
.....
What I want to do is create a flux which returns one element for each distinct key
and summed count
for each key-group.
So basically like a classic reduce for each group, but using the reduce
operator does not work, because it only returns a single element and I want to get a flux with one element for each distinct key.
Using bufferUntil
might work, but has the drawback, that I have to keep a state to check if the key
has changed in comparison to the previous one.
Using groupBy
is an overkill, as I know that each group has come to an end once a new key is found, so I don't want to keep anything cached after that event.
Is such an aggregation possible using Flux
, without keeping a state outside of the flow?
Upvotes: 4
Views: 2852
Reputation: 81
That really worked for me! Thanks for that post.
Please note that in the meantime the "compose" method was renamed. You need to use transformDeferred
instead.
In my case I have a "Dashboard" object which has an id (stored as UUID) on which I want to group the source flux:
Flux<Dashboard> sourceFlux = ... // could be a DB query. The Flux must be sorted according the id.
sourceFlux.transformDeferred(dashboardFlux -> {
// this stores the dashboardId's as the Flux publishes. It is used to decide when to open a new window
// having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<UUID> last = new AtomicReference<>(null);
return dashboardFlux
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getDashboardId().equals(last.getAndSet(i.getDashboardId())), true)
//reduce each window
.flatMap(window -> window.reduce(... /* reduce one window here */));
})
Upvotes: 0
Reputation: 28301
This is currently (as of 3.2.5) not possible without keeping track of state yourself. distinctUntilChanged
could have fit the bill with minimal state but doesn't emit the state, just the values it considered as "distinct" according to said state.
The most minimalistic way of solving this is with windowUntil
and compose
+ an AtomicReference
for state-per-subscriber:
Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
//having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<T> last = new AtomicReference<>(null);
return source
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
//reduce each window
.flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});
Upvotes: 2