Reputation: 57
I am bit new to Spring Reactive programming. I am trying to get a Flux from I/O, whi ch returns a list of objects and what to combine to return a Mono from my service.
Flux<Obj1> -> Mono<Obj2>
Obj1
{
"a" : "123",
"combine" : "456"
"combine2" : "789"
}
Flux<Obj1> has multiple objects
Obj2
{
"a" : "123"
"combine" : {
"456" : "1"
},
"combine2" : {
"789" : "2"
}
}
Mono<Obj2> is a consolidation of flux with the Combiner keys.
To achieve this, my initial approach is to make sure Flux is completed using then and after that manipulate data.
Flux.just(obj1a,obj1b,obj1c)
.then();
But the above statement returns a void of Mono, not sure how may thenMany could work in this case.
I feel missing something here, how should i get control of Flux objects after completion.
Upvotes: 2
Views: 7895
Reputation: 72254
To achieve this, my initial approach is to make sure Flux is completed using then and after that manipulate data.
This is the wrong way of thinking for reactive programming - you need to modify the data as it's flowing through the flux. The then()
methods will ignore the results from the flux entirely, and just output some other, unrelated Mono
when complete.
In the case where you want to take a Flux
of some element, and want to reduce that down into a Mono
of some other element, you most likely want the reduce()
method. That will take an initial Obj2
in your case, and then a BiFunction
whose purpose is to take an intermediate Obj2
, an Obj1
in the Flux
, and then produce an updated Obj2
. The reduce()
operator will then apply this reduction on the entire stream, giving you a Mono<Obj2>
at the end.
It's not immediately obvious from your code what you want to achieve specifically, but the following is a related example (lombok used for brevity):
@Data
@AllArgsConstructor
class Obj1 {
private String a;
private String combine;
private String combine2;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {
private String a;
private Map<String, Integer> combine = new HashMap<>();
private Map<String, Integer> combine2 = new HashMap<>();
}
public class NewClass {
public static void main(String[] args) {
Flux<Obj1> flux = Flux.just(
new Obj1("123", "456", "789"),
new Obj1("123", "456", "789"),
new Obj1("123", "455", "789"));
Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
Map<String, Integer> combine = new HashMap<>(o2.getCombine());
combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
return new Obj2(o1.getA(), combine, combine2);
});
mono.subscribe(System.out::println);
}
}
Upvotes: 4