Jan Wodniak
Jan Wodniak

Reputation: 31

Why does disposing of a Flux-based subscription in Micronaut’s Reactor HTTP client hang subsequent requests?

I have a Micronaut application using the declarative Reactor HTTP client, backed by DefaultHttpClient (DefaultHttpClient implements both HttpClient and StreamingHttpClient).

When I subscribe to a request that returns a Flux<T> and call dispose() on the resulting Disposable, any future HTTP calls hang.

Notably, this does not happen if I call the same endpoint returning Mono<List<T>> and then dispose the subscription.

Is this expected behavior, and how can I safely cancel an individual Flux request without shutting down the entire client’s resources?

A sample repo demonstrates the issue, with a README that provides steps to reproduce it.

Any insights or tips would be appreciated!

Upvotes: 3

Views: 150

Answers (1)

Roar S.
Roar S.

Reputation: 11144

Update

The issue has been isolated and narrowed down specifically to the upstream SignalType.CANCEL.

.log("transactionsAsFlux", Level.INFO, SignalType.CANCEL)

After thoroughly testing your code, it appears that the .log() inside transactionsAsFluxThenDispose is the reason the application hangs after invoking the /flux/dispose endpoint.

Removing .log() from all other endpoints than /transactions/flux/dispose also works, so there is something around .log() that needs to be investigated.

    @Get("/flux/dispose")
    Mono<String> transactionsAsFluxThenDispose() {
        // removed .log() from the next line
        Flux<Transaction> flux = mockedTransactionsApi.transactionsAsFlux();
        Disposable disposable = flux.subscribe();
        disposable.dispose();
        return Mono.just("Disposed %s".formatted(disposable.getClass()));
    }

I previously mentioned short-circuiting the Flux using .takeWhile in a comment. With this approach, the code works even with .log.

        Flux<Transaction> flux = mockedTransactionsApi.transactionsAsFlux().log();
        final AtomicBoolean doContinue = new AtomicBoolean(true);

        Disposable disposable = flux
                .takeWhile(transaction -> doContinue.get())
                .subscribe();

        // stop emitting further elements
        doContinue.set(false);

Upvotes: 1

Related Questions