Reputation: 63
I've been reading throughout the Reactor documentation, but I was not being able to find proper pattern for the following problem. I have a method that is supposed to do something asynchronously. I returns the result responses in form of a Flux and the consumer could subscribe to it.
The method has following definition:
Flux<ResultMessage> sendRequest(RequestMessage message);
The returning flux is a hot flux, results can come at any given time asynchronously.
The potential consumer could use it in following manner:
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
An implementation can be like this:
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
Where the incomingMessageStream
is a Flux
of all messages going through this channel.
Problem with this implementation is that consumer is subscribed after the result messages are coming, and it can miss some of them.
So, what I am looking for is a solution that will allow consumer not to depend on time of subscription. A potential consumer may not be required to subscribe to resulting Flux
at all. I am looking for a general solution, but if it is not possible you can assume that number of resulting messages is not greater than 2.
Upvotes: 0
Views: 1529
Reputation: 63
After some time I created a solution that seems to work:
Flux<ResultMessage> sendRequest(RequestMessage message) {
final int maxResponsesCount = 2;
final Duration responseTimeout = Duration.ofSeconds( 10 );
final Duration subscriptionTimeout = Duration.ofSeconds( 5 );
// (1)
ConnectableFlux<ResultMessage> result = incomingMessageStream
.ofType( ResultMessage.class )
.filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
.take( maxResponsesCount )
.timeout( responseTimeout )
.replay( maxResponsesCount );
Disposable connectionDisposable = result.connect();
// (2)
AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
Mono.delay( subscriptionTimeout )
.doOnSubscribe( subscriptionForCancelSubscription::set )
.subscribe( x -> connectionDisposable.dispose() );
// The message sending is done here...
// (3)
return result
.doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
.doFinally( signalType -> connectionDisposable.dispose() );
}
I am using a ConnectableFlux that connects to the stream immediately, without subscribing, which is set to use reply() method to store all messages, so any subscriber at the later point would not miss response messages (1).
There are few paths this can be executed:
Method is called and subscribed to the flux
2.1. No message has been returned
.timeout( responseTimeout )
). After that .doFinally(..)
cleans the resources (1)(3).2.2. Some of response messages have been returned
2.3. All response messages have been returned
doFinally()
is executed due to max number of elements reached ( .take( maxResponsesCount )
) (1)(3)I am yet to perform some serious testing on this, if something goes wrong, I'll add the correction to this answer.
Upvotes: 1