Reputation: 513
I have code that needs to interact with two reactive dependencies - saving to a database and publishing to a stream, each of which return Mono
instances. After this, I need to do some more processing - all using the original object I started with, something like this:
val myObject = Mono.just(“thing”);
myObject.flatMap(repository::save)
.flatMap(stream::publish)
.map(obj -> moreProcessing(obj));
But the problem is, repository.save
doesn’t return a Mono
of the object I’m interested in and neither does stream.publish
.
I can achieve what I want with something like:
myObject.flatMap(obj -> repository.save(obj).then(Mono.just(obj))
.flatMap(obj -> stream.publish(obj).then(Mono.just(obj))
.map(obj -> moreProcessing(obj));
i.e. mapping to the dependency functions just to get their behaviour and then mapping straight back again, but this seems odd - like I might be missing some better way to use the API which doesn’t give the hint that I’m mapping between types, because that’s not what I’m using flatMap
for, really.
I can also go with something like:
myObject.flatMap(obj ->
repository.save(obj)
.flatMap(x -> publisher.stream(obj))
.map(x -> moreProcessing(obj)));
but that doesn’t seem like a great way to go either, as it’s going to lead to hard-to-maintain code in even relatively simple cases.
Thoughts?
Upvotes: 2
Views: 4334
Reputation: 9
One way to avoid losing any of your references could be something like:
Mono<User> monoUser = userRepository.findByUsername(username)
.filter(u -> validatePassword(pass, u.getPassword()));
Mono<Clinic> monoClinic = monoUser.flatMap(u -> clinicRepository.findByUserId(u.getId()));
return Mono.zip(monoUser, monoClinic, (u,c) -> proccess(u, c));
The first Mono return an user, the second mono is using the user returned by the first Mono and return a clinic, and the returned mono (third mono) is using the output of the first mono and the output of the second mono to create a different object (returned by proccess method that is using the user and the clinic)
Upvotes: 0
Reputation: 123
did you try a thipWhen
? It behaves like you wish with the only restriction, that it completes immediatly, if the result of the rightGenerator
is empty (there is no next signal). So you need to either return an Optional
or use thenReturn
or switchIfEmpty
.
Upvotes: 0
Reputation: 1602
I had a similar problem and I solved it using code bellow. I think it is important not to get lost in method chaining at all cost. As it very easy to overengeener creating the constructions that are hard to read.
@Test
public void mergeMonos(){
Mono<String> myObject = Mono.just("my object");
Mono<String> savedToDB = saveToDb(myObject);
Mono<String> savedToStream = saveToStream(myObject);
//if you need to wait for the first two to complete
Flux.concat(savedToDB, savedToStream, myObject)
.last()
.subscribe(System.out::println);
//if you don't need to wait for the db and stream
Flux.merge(savedToDB, savedToStream, myObject)
.last()
.subscribe(System.out::println);
}
private Mono<String> saveToStream(Mono<String> myObject) {
return myObject.map(t-> "saved to stream");
}
private Mono<String> saveToDb(Mono<String> myObject) {
return myObject.map(t-> "saved to db");
}
Upvotes: 1