underflow
underflow

Reputation: 103

What's the easiest way to wait for Mono completion in the background?

We are given a Mono, that's handling some action(say a database update), and returns a value.
We want to add that Mono(transformed) to a special list that contains actions to be completed for example during shutdown.
That mono may be eagerly subscribed after adding to the list, to start processing now, or .subscribe() might not be called meaning it will be only subscribed during shutdown.

During shutdown we can iterate on the list in the following way:

for (Mono mono : specialList) {
  Object value = mono.block(); // (do something with value)
}

How to transform the original Mono such that when shutdown code executes, and Mono was previously subscribed(), the action will not be triggered again but instead it will either wait for it to complete or replay it's stored return value?

Upvotes: 1

Views: 1332

Answers (1)

underflow
underflow

Reputation: 103

OK, looks like it is as simple as calling mono.cache(), so this is how I used it in practice

    public Mono<Void> addShutdownMono(Mono<Void> mono) {
        mono = mono.cache();
        Mono<Void> newMono = mono.doFinally(signal -> shutdownMonos.remove(mono));
        shutdownMonos.add(mono);

        return newMono;
    }

    public Function<Mono<Void>,Mono<Void>> asShutdownAwaitable() {
        return mono -> addShutdownMono(mono);
    }
database.doSomeAction()
  .as(asShutdownAwaitable)
  .subscribe() // Or don't subscribe at all, deferring until shutdown

Here is the actual shutdown code. It was also important to me that they execute in order of being added, if user chose not to eagerly subscribe them, that's reason for Flux.concat instead of Flux.merge.

public void shutdown() {
        Flux.concat(Lists.transform(new ArrayList<>(shutdownMonos), mono -> mono.onErrorResume(err -> {
                    logger.error("Async exception during shutdown, ignoring", err);
                    return Mono.empty();
                }))
        ).blockLast();
    }

Upvotes: 2

Related Questions