Reputation: 917
I have a problem when I try to execute a Mono inside doFinally
clause.
This is my code.
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}
The problem is that doUnlock(lock)
inside doFinally()
returns a mono that no one is subscribed for because doFinally
is not chaining. So the async code part in doUnlock
is never actually executed.
Is there any way to avoid this using Mono
or Flux
helpers?
Upvotes: 4
Views: 15338
Reputation: 12184
Mono#then.
Unfortunately, you cannot avoid the usage of Mono/Flux, once your API is built on top of it, however, you may HACK that problem in the following way.
To chain several independent executions which should be subscribed one after the other and the result of the first will be returned after the first has been finished, there is a Mono#then
operator which allows writing following (promise-like) code:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
);
}
}
Here, to chain execution and then release the lock and then return staged value, we use flatMap
to map value as the releasing of the lock and then
returning of the staged value again. (admit, sounds awkward)
Note, in the case of error terminal signal, then
will be ignored. Thus, to achieve try-finally behaviors, it might be required providing additional orErrorResume
operator, as depicted in the following example:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
.onErrorResume(t ->
doUnlock(lock)
.then(Mono.error(t))
)
);
}
}
Upvotes: 11