davioooh
davioooh

Reputation: 24676

Update original Mono when error occurs in chain

Suppose I have something like this:

Mono.just(a)
    .flatMap(...) // init
    // set fields on a if error occurs
    .flatMap(...) // finalize
    .subscribe();

In the last flatMap I'm executing some final logic depending on value of fields in a. In particular I need to know if an error occurred during init...

So I need a method that will intercept errors and permit to manipulate original object.

If I'm not wrong, all onError* methods return the original object without permitting updating it.

Also doOnError doesn't seem to solve my problem.

Is there a way to achieve what I need?

Upvotes: 0

Views: 1384

Answers (3)

Prashant Pandey
Prashant Pandey

Reputation: 4642

Mono.just(a)
    .flatMap(...) // init
    // set fields on a if error occurs
    .onErrorResume(getFallbackMono(a))
    .flatMap(...) // finalize
    .subscribe();

...

private Mono<ClassOfA> getFallbackMono(ClassOfA a) {
    return Mono.just(getMutatedA(a));
}

.onErrorResume will return a stream to fallback on. Here, you can create a new stream with the mutated value of a.

.doOnError() is a side-effect operator. I would use it to log the error.

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

Keep in mind this is a chain of discrete and (potentially) asynchronous steps. Once you've entered a step, the only context you get is the previous step's output in the form of the onNext, onComplete and onError signals, plus whatever java core language feature can expose to you. For example, if you're assembling an inner Flux in a flatmap lambda, java gives you access to that lambda's input parameters.

The later might be the solution to what you need: a scope in which you can get hold of the original value.

So that first flatMap can act as that enclosing scope and deal with both nominal and error cases. Something like:

Mono<B> processed = Mono.just(a)
    .flatMap(a -> init(a)
        .onErrorResume(fallback(a))
    )
    .flatMap(...); // finalize
processed.subscribe();

//for illustration purposes, what process and fallback may typically look like:
Mono<B> init(A a) {
    return Mono.zip(
        asyncServiceStuff.process(a.stuff),
        asyncServiceOtherStuff.process(a.otherStuff),
        (bPart1, bPart2) -> new B(bPart1, bPart2)
    );
}

Mono<B> fallback(A original) {
    return Mono.just(new FallbackB(a.otherStuff));
}

That said, it seems that you want to create the fallback from a, which is already accessible within the scope of the method (ideally as a final input parameter or variable).

Upvotes: 2

Artem Bilan
Artem Bilan

Reputation: 121262

You are missing the fact that every single next reactive operator build for us a new Mono instance. So, it is not going to matter for you if you add one more operator:

/**
 * Simply emit a captured fallback value when any error is observed on this {@link Mono}.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/onErrorReturnForMono.svg" alt="">
 *
 * @param fallback the value to emit if an error occurs
 *
 * @return a new falling back {@link Mono}
 */
public final Mono<T> onErrorReturn(final T fallback) {

Mono.just(a)
    .flatMap(...)
    .onErrorReturn(fallbackValue)
    .flatMap(...)
    .subscribe();

You definitely can investigate all other onError* for any possible error handling variants.

Upvotes: 0

Related Questions