vshah1
vshah1

Reputation: 57

Reduce Flux to form Mono

I am bit new to Spring Reactive programming. I am trying to get a Flux from I/O, whi ch returns a list of objects and what to combine to return a Mono from my service.

Flux<Obj1> -> Mono<Obj2>

Obj1
{
"a" : "123",
"combine" : "456"
"combine2" : "789"
}

Flux<Obj1> has multiple objects

Obj2
{
"a" : "123"
"combine" : {
              "456" : "1"
            },
"combine2" : {
               "789" : "2"
             }
}

Mono<Obj2> is a consolidation of flux with the Combiner keys.

To achieve this, my initial approach is to make sure Flux is completed using then and after that manipulate data.

Flux.just(obj1a,obj1b,obj1c)
    .then();

But the above statement returns a void of Mono, not sure how may thenMany could work in this case.

I feel missing something here, how should i get control of Flux objects after completion.

Upvotes: 2

Views: 7895

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

To achieve this, my initial approach is to make sure Flux is completed using then and after that manipulate data.

This is the wrong way of thinking for reactive programming - you need to modify the data as it's flowing through the flux. The then() methods will ignore the results from the flux entirely, and just output some other, unrelated Mono when complete.

In the case where you want to take a Flux of some element, and want to reduce that down into a Mono of some other element, you most likely want the reduce() method. That will take an initial Obj2 in your case, and then a BiFunction whose purpose is to take an intermediate Obj2, an Obj1 in the Flux, and then produce an updated Obj2. The reduce() operator will then apply this reduction on the entire stream, giving you a Mono<Obj2> at the end.

It's not immediately obvious from your code what you want to achieve specifically, but the following is a related example (lombok used for brevity):

@Data
@AllArgsConstructor
class Obj1 {

    private String a;
    private String combine;
    private String combine2;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {

    private String a;
    private Map<String, Integer> combine = new HashMap<>();
    private Map<String, Integer> combine2 = new HashMap<>();

}

public class NewClass {

    public static void main(String[] args) {
        Flux<Obj1> flux = Flux.just(
                new Obj1("123", "456", "789"),
                new Obj1("123", "456", "789"),
                new Obj1("123", "455", "789"));

        Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
            Map<String, Integer> combine = new HashMap<>(o2.getCombine());
            combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
            Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
            combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
            return new Obj2(o1.getA(), combine, combine2);
        });

        mono.subscribe(System.out::println);
    }

}

Upvotes: 4

Related Questions