Koguro
Koguro

Reputation: 917

How to trigger Mono execution after another Mono terminates

I have a problem when I try to execute a Mono inside doFinally clause. This is my code.

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}

The problem is that doUnlock(lock) inside doFinally() returns a mono that no one is subscribed for because doFinally is not chaining. So the async code part in doUnlock is never actually executed.

Is there any way to avoid this using Mono or Flux helpers?

Upvotes: 4

Views: 15338

Answers (1)

Oleh Dokuka
Oleh Dokuka

Reputation: 12184

Use Mono#then.

Unfortunately, you cannot avoid the usage of Mono/Flux, once your API is built on top of it, however, you may HACK that problem in the following way.

To chain several independent executions which should be subscribed one after the other and the result of the first will be returned after the first has been finished, there is a Mono#then operator which allows writing following (promise-like) code:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                );
    }
}

Here, to chain execution and then release the lock and then return staged value, we use flatMap to map value as the releasing of the lock and then returning of the staged value again. (admit, sounds awkward)

Note, in the case of error terminal signal, then will be ignored. Thus, to achieve try-finally behaviors, it might be required providing additional orErrorResume operator, as depicted in the following example:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                         .onErrorResume(t -> 
                            doUnlock(lock)
                            .then(Mono.error(t))
                         )
                );
    }
}

Upvotes: 11

Related Questions