Reputation: 2052
I'm already querying some external resource with Flux.using()
. Now I want to implement a kind of optimistic locking: read some state before query starts to execute and check if it was updated after query is finished. If so - throw some exception to break http request handling.
I've achieved this by using doOnComplete
:
final AtomicReference<String> initialState = new AtomicReference<>();
return Flux.just("some", "constant", "data")
.doOnComplete(() -> initialState.set(getState()))
.concatWith(Flux.using(...)) //actual data query
.doOnComplete(() -> {if (!initialState.get().equals(getState())) throw new RuntimeException();})
.concatWithValues("another", "constant", "data")
My questions:
doOnComplete
lambda would be finished before Flux.using()
and is it guaranteed that 2nd doOnComplete
lambda would be executed strictly after?Upvotes: 1
Views: 575
Reputation: 335
The first doOnComplete
would be executed after Flux.just("some", "constant", "data")
emits all elements and the second one after emitted Publisher defined in concatWith
completes successfully. This is working because both publishers have a finite number of elements.
With the proposed approach, however the pre-/postconditions from a particular operation are handled outside of the operations at a higher level. In other words, the condition check belonging to the operation is leaking to the flux definition.
Suggestion, pushing the condition check down to the operation:
var otherElements = Flux.using( // actual data query
() -> "other",
x -> {
var initialState = getState();
return Flux.just(x).doOnComplete(() ->
{ if (!initialState.equals(getState())) throw new IllegalStateException(); }
);
},
x -> { }
);
Flux.just("some", "constant", "data")
.concatWith(otherElements)
.concatWith(Mono.just("another")) // "constant", "data" ...
Upvotes: 2