Reputation: 31
I have a Micronaut application using the declarative Reactor HTTP client, backed by DefaultHttpClient (DefaultHttpClient
implements both HttpClient
and StreamingHttpClient
).
When I subscribe to a request that returns a Flux<T>
and call dispose()
on the resulting Disposable
, any future HTTP calls hang.
Notably, this does not happen if I call the same endpoint returning Mono<List<T>>
and then dispose the subscription.
Is this expected behavior, and how can I safely cancel an individual Flux
request without shutting down the entire client’s resources?
A sample repo demonstrates the issue, with a README that provides steps to reproduce it.
Any insights or tips would be appreciated!
Upvotes: 3
Views: 150
Reputation: 11144
Update
The issue has been isolated and narrowed down specifically to the upstream SignalType.CANCEL
.
.log("transactionsAsFlux", Level.INFO, SignalType.CANCEL)
After thoroughly testing your code, it appears that the .log()
inside transactionsAsFluxThenDispose
is the reason the application hangs after invoking the /flux/dispose endpoint.
Removing .log()
from all other endpoints than /transactions/flux/dispose
also works, so there is something around .log()
that needs to be investigated.
@Get("/flux/dispose")
Mono<String> transactionsAsFluxThenDispose() {
// removed .log() from the next line
Flux<Transaction> flux = mockedTransactionsApi.transactionsAsFlux();
Disposable disposable = flux.subscribe();
disposable.dispose();
return Mono.just("Disposed %s".formatted(disposable.getClass()));
}
I previously mentioned short-circuiting the Flux using .takeWhile
in a comment. With this approach, the code works even with .log
.
Flux<Transaction> flux = mockedTransactionsApi.transactionsAsFlux().log();
final AtomicBoolean doContinue = new AtomicBoolean(true);
Disposable disposable = flux
.takeWhile(transaction -> doContinue.get())
.subscribe();
// stop emitting further elements
doContinue.set(false);
Upvotes: 1