Reputation: 4375
I am having a Mono object, On which I have subscribed for doOnsuccess, In this method again I am saving the data in DB(CouchBase Using ReactiveCouchbaseRepository). after that, I am not getting any logs for Line1 and line2.
But this is working fine if I do not save this object, means I am getting logs for line 2.
Mono<User> result = context.getPayload(User.class);
result.doOnSuccess( user -> {
System.out.println("############I got the user"+user);
userRepository.save(user).doOnSuccess(user2->{
System.out.println("user saved"); // LINE 1
}).subscribe();
System.out.println("############"+user); // LINE2
}).subscribe();
Upvotes: 2
Views: 1926
Reputation: 59211
Your code snippet is breaking a few rules you should follow closely:
You should not call subscribe
from within a method/lambda that returns a reactive type such as Mono
or Flux
; this will decouple the execution from the main task while they'll both still operate on that shared state. This often ends up on issues because things are trying to read twice the same stream. It's a bit like you're trying to create two separate threads that try reading on the same outputstream.
you should not do I/O operations in doOnXYZ
operators. Those are "side-effects" operators, meaning they are useful for logging, increment counters.
What you should try instead to chain Reactor operators to create a single reactive pipeline and return the reactive type for the final client to subscribe on it. In a Spring WebFlux application, the HTTP clients (through the WebFlux engine) are subscribing.
Your code snippet could look like this:
Mono<User> result = context.getPayload(User.class)
.doOnSuccess(user -> System.out.println("############Received user "+user))
.flatMap(user -> {return userRepository.save(user)})
.doOnSuccess(user -> System.out.println("############ Saved "+user));
return result;
Upvotes: 4