gdomo
gdomo

Reputation: 2052

WebFlux/Reactor: checking conditions before+after Flux execution with doOnComplete

I'm already querying some external resource with Flux.using(). Now I want to implement a kind of optimistic locking: read some state before query starts to execute and check if it was updated after query is finished. If so - throw some exception to break http request handling.

I've achieved this by using doOnComplete:

final AtomicReference<String> initialState = new AtomicReference<>();

return Flux.just("some", "constant", "data")
    .doOnComplete(() -> initialState.set(getState()))
    .concatWith(Flux.using(...)) //actual data query
    .doOnComplete(() -> {if (!initialState.get().equals(getState())) throw new RuntimeException();})
    .concatWithValues("another", "constant", "data")

My questions:

  1. Is it correct? Is it guaranteed that 1st doOnComplete lambda would be finished before Flux.using() and is it guaranteed that 2nd doOnComplete lambda would be executed strictly after?
  2. Does more elegant solution exists?

Upvotes: 1

Views: 575

Answers (1)

gindex
gindex

Reputation: 335

The first doOnComplete would be executed after Flux.just("some", "constant", "data") emits all elements and the second one after emitted Publisher defined in concatWith completes successfully. This is working because both publishers have a finite number of elements.

With the proposed approach, however the pre-/postconditions from a particular operation are handled outside of the operations at a higher level. In other words, the condition check belonging to the operation is leaking to the flux definition.

Suggestion, pushing the condition check down to the operation:

var otherElements = Flux.using( // actual data query
        () -> "other",
        x -> {
            var initialState = getState();
            return Flux.just(x).doOnComplete(() ->
                { if (!initialState.equals(getState())) throw new IllegalStateException(); }
            );
        },
        x -> { }
);

Flux.just("some", "constant", "data")
        .concatWith(otherElements)
        .concatWith(Mono.just("another")) // "constant", "data" ...

Upvotes: 2

Related Questions