Reputation: 24676
Suppose I have something like this:
Mono.just(a)
.flatMap(...) // init
// set fields on a if error occurs
.flatMap(...) // finalize
.subscribe();
In the last flatMap
I'm executing some final logic depending on value of fields in a
. In particular I need to know if an error occurred during init...
So I need a method that will intercept errors and permit to manipulate original object.
If I'm not wrong, all onError*
methods return the original object without permitting updating it.
Also doOnError
doesn't seem to solve my problem.
Is there a way to achieve what I need?
Upvotes: 0
Views: 1384
Reputation: 4642
Mono.just(a)
.flatMap(...) // init
// set fields on a if error occurs
.onErrorResume(getFallbackMono(a))
.flatMap(...) // finalize
.subscribe();
...
private Mono<ClassOfA> getFallbackMono(ClassOfA a) {
return Mono.just(getMutatedA(a));
}
.onErrorResume
will return a stream to fallback on. Here, you can create a new stream with the mutated value of a
.
.doOnError()
is a side-effect operator. I would use it to log the error.
Upvotes: 0
Reputation: 28301
Keep in mind this is a chain of discrete and (potentially) asynchronous steps. Once you've entered a step, the only context you get is the previous step's output in the form of the onNext
, onComplete
and onError
signals, plus whatever java core language feature can expose to you. For example, if you're assembling an inner Flux
in a flatmap
lambda, java gives you access to that lambda's input parameters.
The later might be the solution to what you need: a scope in which you can get hold of the original value.
So that first flatMap
can act as that enclosing scope and deal with both nominal and error cases. Something like:
Mono<B> processed = Mono.just(a)
.flatMap(a -> init(a)
.onErrorResume(fallback(a))
)
.flatMap(...); // finalize
processed.subscribe();
//for illustration purposes, what process and fallback may typically look like:
Mono<B> init(A a) {
return Mono.zip(
asyncServiceStuff.process(a.stuff),
asyncServiceOtherStuff.process(a.otherStuff),
(bPart1, bPart2) -> new B(bPart1, bPart2)
);
}
Mono<B> fallback(A original) {
return Mono.just(new FallbackB(a.otherStuff));
}
That said, it seems that you want to create the fallback from a
, which is already accessible within the scope of the method (ideally as a final
input parameter or variable).
Upvotes: 2
Reputation: 121262
You are missing the fact that every single next reactive operator build for us a new Mono
instance. So, it is not going to matter for you if you add one more operator:
/**
* Simply emit a captured fallback value when any error is observed on this {@link Mono}.
*
* <p>
* <img class="marble" src="doc-files/marbles/onErrorReturnForMono.svg" alt="">
*
* @param fallback the value to emit if an error occurs
*
* @return a new falling back {@link Mono}
*/
public final Mono<T> onErrorReturn(final T fallback) {
Mono.just(a)
.flatMap(...)
.onErrorReturn(fallbackValue)
.flatMap(...)
.subscribe();
You definitely can investigate all other onError*
for any possible error handling variants.
Upvotes: 0