AndyB
AndyB

Reputation: 513

What’s the best way to invoke a second Mono without losing the result of the first?

I have code that needs to interact with two reactive dependencies - saving to a database and publishing to a stream, each of which return Mono instances. After this, I need to do some more processing - all using the original object I started with, something like this:

val myObject = Mono.just(“thing”);
myObject.flatMap(repository::save)
        .flatMap(stream::publish)
        .map(obj -> moreProcessing(obj));

But the problem is, repository.save doesn’t return a Mono of the object I’m interested in and neither does stream.publish.

I can achieve what I want with something like:

myObject.flatMap(obj -> repository.save(obj).then(Mono.just(obj))
        .flatMap(obj -> stream.publish(obj).then(Mono.just(obj))
        .map(obj -> moreProcessing(obj));

i.e. mapping to the dependency functions just to get their behaviour and then mapping straight back again, but this seems odd - like I might be missing some better way to use the API which doesn’t give the hint that I’m mapping between types, because that’s not what I’m using flatMap for, really.

I can also go with something like:

myObject.flatMap(obj ->
            repository.save(obj)
                .flatMap(x -> publisher.stream(obj))
                .map(x -> moreProcessing(obj)));

but that doesn’t seem like a great way to go either, as it’s going to lead to hard-to-maintain code in even relatively simple cases.

Thoughts?

Upvotes: 2

Views: 4334

Answers (3)

Jerson Viveros
Jerson Viveros

Reputation: 9

One way to avoid losing any of your references could be something like:

Mono<User> monoUser = userRepository.findByUsername(username)
                                .filter(u -> validatePassword(pass, u.getPassword()));

Mono<Clinic> monoClinic = monoUser.flatMap(u -> clinicRepository.findByUserId(u.getId()));
        
return Mono.zip(monoUser, monoClinic, (u,c) -> proccess(u, c)); 

The first Mono return an user, the second mono is using the user returned by the first Mono and return a clinic, and the returned mono (third mono) is using the output of the first mono and the output of the second mono to create a different object (returned by proccess method that is using the user and the clinic)

Upvotes: 0

Gena Batalski
Gena Batalski

Reputation: 123

did you try a thipWhen? It behaves like you wish with the only restriction, that it completes immediatly, if the result of the rightGenerator is empty (there is no next signal). So you need to either return an Optional or use thenReturn or switchIfEmpty.

Upvotes: 0

piotr szybicki
piotr szybicki

Reputation: 1602

I had a similar problem and I solved it using code bellow. I think it is important not to get lost in method chaining at all cost. As it very easy to overengeener creating the constructions that are hard to read.

@Test
public void mergeMonos(){
    Mono<String> myObject = Mono.just("my object");
    Mono<String> savedToDB = saveToDb(myObject);
    Mono<String> savedToStream = saveToStream(myObject);


    //if you need to wait for the first two to complete
    Flux.concat(savedToDB, savedToStream, myObject)
            .last()
            .subscribe(System.out::println);

    //if you don't need to wait for the db and stream
    Flux.merge(savedToDB, savedToStream, myObject)
            .last()
            .subscribe(System.out::println);
}

private Mono<String> saveToStream(Mono<String> myObject) {
    return myObject.map(t-> "saved to stream");
}

private Mono<String> saveToDb(Mono<String> myObject) {
    return myObject.map(t-> "saved to db");
}

Upvotes: 1

Related Questions