somebody
somebody

Reputation: 63

How not to depend on subscription time in Reactor

I've been reading throughout the Reactor documentation, but I was not being able to find proper pattern for the following problem. I have a method that is supposed to do something asynchronously. I returns the result responses in form of a Flux and the consumer could subscribe to it.

The method has following definition:

Flux<ResultMessage> sendRequest(RequestMessage message);

The returning flux is a hot flux, results can come at any given time asynchronously.

The potential consumer could use it in following manner:

sendRequest(message).subscribe(response->doSomethinWithResponse(response);

An implementation can be like this:

Flux<ResultMessage> sendRequest(RequestMessage message) {
   Flux<ResultMessage> result = incomingMessageStream
            .filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
            .take( 2 );
// The message sending is done here...

    return result;
}

Where the incomingMessageStream is a Flux of all messages going through this channel. Problem with this implementation is that consumer is subscribed after the result messages are coming, and it can miss some of them.

So, what I am looking for is a solution that will allow consumer not to depend on time of subscription. A potential consumer may not be required to subscribe to resulting Flux at all. I am looking for a general solution, but if it is not possible you can assume that number of resulting messages is not greater than 2.

Upvotes: 0

Views: 1529

Answers (1)

somebody
somebody

Reputation: 63

After some time I created a solution that seems to work:

Flux<ResultMessage> sendRequest(RequestMessage message) {
  final int maxResponsesCount = 2;
  final Duration responseTimeout = Duration.ofSeconds( 10 );
  final Duration subscriptionTimeout = Duration.ofSeconds( 5 );

  // (1) 
  ConnectableFlux<ResultMessage> result = incomingMessageStream
      .ofType( ResultMessage.class )
      .filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
      .take( maxResponsesCount )
      .timeout( responseTimeout )
      .replay( maxResponsesCount );
  Disposable connectionDisposable = result.connect();

  // (2)
  AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
  Mono.delay( subscriptionTimeout )
    .doOnSubscribe( subscriptionForCancelSubscription::set )
    .subscribe( x -> connectionDisposable.dispose() );

  // The message sending is done here...

  // (3)
  return result
    .doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
    .doFinally( signalType -> connectionDisposable.dispose() );
}

I am using a ConnectableFlux that connects to the stream immediately, without subscribing, which is set to use reply() method to store all messages, so any subscriber at the later point would not miss response messages (1).

There are few paths this can be executed:

  1. Method is called and no subscription has performed on the flux
    • Solution - there is a timer that removes the connected flux resource after 5 seconds if no subscription is done. (2)
  2. Method is called and subscribed to the flux

    2.1. No message has been returned

    • Solution - there is a timeout set for getting responses (.timeout( responseTimeout )). After that .doFinally(..) cleans the resources (1)(3).

    2.2. Some of response messages have been returned

    • Solution - same as 2.1.

    2.3. All response messages have been returned

    • Solution - The doFinally() is executed due to max number of elements reached ( .take( maxResponsesCount ) ) (1)(3)

I am yet to perform some serious testing on this, if something goes wrong, I'll add the correction to this answer.

Upvotes: 1

Related Questions