A5300
A5300

Reputation: 419

Correct way to execute multiple reactive operations

I get from a reactive repository a Mono<FooBar> based on it's value I have to create two other objects save them using reactive repositories, modify FooBar object and save it as well.

As I'm new to reactive programming I landed with the following solution, which is working, but I'm not sure if I'm using correctly the reactive API:

@Test
void createAndSave() {
    Mono<FooBar> fooBarMono = findFooBar()  // returns Mono<FooBar>
            .map(fooBar -> {
        createAndSaveLoremBar(fooBar).subscribe();   // returns Mono<LoremBar>
        createAndSaveDoloremBar(fooBar).subscribe(); // returns Mono<DoloremBar>

        fooBar.setActive(true);

        return saveFooBar(fooBar);          // returns Mono<FooBar>
    }).flatMap(Function.identity());

    StepVerifier.create(fooBarMono)
            .expectNextMatches(Objects::nonNull)
            .expectComplete()
            .verify();

}

from console log:

   saved lorem bar
   saved dolorem bar
   saved foo bar

Upvotes: 5

Views: 5670

Answers (2)

piotr szybicki
piotr szybicki

Reputation: 1602

I think the bellow solution is a little bit more readable. Anyway Alexander is correct you should never ever modify the input. You see we are borrowing a lot of concept form functional programming. For example you call Function.identity() this is called identity functor. Both Fluxand Mono are monads. And there is a concept that is secret for these guys called Referential transparency that imperative update break.

    final Mono<FooBar> fooBarMono1 = findFooBar()
            .zipWhen((fooBar) -> createAndSaveLoremBar(fooBar))
            .map(tuple -> tuple.getT1())
            .zipWhen((fooBar) -> createAndSaveDoloremBar(fooBar))
            .map(tuple -> tuple.getT1())
            .map(fooBar -> new FooBar(true))
            .flatMap(fooBar -> saveFooBar(fooBar));

Or more concise:

    final Mono<FooBar> fooBarMono1 = findFooBar()
            .zipWhen((fooBar) -> createAndSaveLoremBar(fooBar)
                                    .then(createAndSaveDoloremBar(fooBar)))
            .map(tuple -> tuple.getT1())
            .map(fooBar -> new FooBar(true))
            .flatMap(fooBar -> saveFooBar(fooBar));

Upvotes: 7

Alexander Pankin
Alexander Pankin

Reputation: 3955

At first, mutating objects in the asynchronous (reactive) world is not a good idea.

Anyway, in your solution possible errors on lorem and dolorem saving are ignored. You can improve it like so:

Mono<FooBar> fooBarMono = findFooBar()
        .flatMap(fooBar -> Flux.merge(
                createAndSaveLoremBar(fooBar),
                createAndSaveDoloremBar(fooBar)) // asynchronously saving lorem and dolorem
                .then(Mono.fromCallable(() -> {  // if there wasn't errors, mutate and save fooBar
                    fooBar.setActive(true);
                    return fooBar;
                }).flatMap(fooBar1 -> saveFooBar(fooBar1))));

If you could create copy of your fooBar with true active flag, the code could be simpler. For example with lombok.

@Builder(toBuilder = true)
public class FooBar {
...
}

Mono<FooBar> fooBarMono = findFooBar()
        .flatMap(fooBar -> Flux.merge(
                createAndSaveLoremBar(fooBar),
                createAndSaveDoloremBar(fooBar))
                .then(saveFooBar(fooBar.toBuilder().active(true).build())));

And if you are not interested in the result of your saveFooBar(...) but only in the completion signal, you could make all three saves asynchronously:

Flux<Object> flux = findFooBar()
        .flatMapMany(fooBar -> Flux.merge(
                createAndSaveLoremBar(fooBar),
                createAndSaveDoloremBar(fooBar),
                saveFooBar(fooBar.toBuilder().active(true).build())));

Actually, in the last approach you could collect all three results and you should prefer this approach, but I don't have enough information about your classes and requirements for creating full example.

Upvotes: 4

Related Questions