Reputation: 311
I have one flux with distinct object types inside it that I want group elements by type and then reduce each group.
here is a trivial code to do this.
Flux<Item> item=Flux.just(new Item("item1"), new Item("item2"));
Flux<Item2> item2=Flux.just(new Item2(4.0f), new Item2(2.0f));
Flux<Object> objects=Mono.empty().concatWith(item).concatWith(item2);
Flux<GroupedFlux<Object>> groups=objects.groupBy(value -> value.getClass());
How can I reduce each flow by type?enter code here
Upvotes: 1
Views: 3466
Reputation: 311
I found the solution on the gitter. Thanks to @simonbasle and @osi for reactiviy.
https://gitter.im/reactor/reactor
The key was the method flatMap and use the key to discrimine values.
groups.flatMap(gf -> {
if(gf.key()==Item.class) {
return gf.reduce(new Item(""), (a, b) -> {
return new Item(a.getValue()+((Item)b).getValue());
});
}
else if(gf.key()==Item2.class) {
return gf.reduce(new Item2(0.0f), (a, b) -> {
return new Item2(a.getValueF()+((Item2)b).getValueF());
});
}
throw new IllegalArgumentException("unknown key: "+gf.key());
});
Upvotes: 4