Reputation: 23
I connect with a RsocketClient to a RsocketServer with the following configuration. This works well. I would like to detect the loss of connection. This works well by registering to the OnClose Mono of the underlying rsocket of the RsocketClient. But only for one time. How can I detect the closing if I have repeated connection losses and reconnections?
requester = rsocketRequesterBuilder.setupRoute("shell-client").setupData(CLIENT_ID)
.setupMetadata(accessToken, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
.rsocketStrategies(rsocketStrategies).
rsocketConnector(connector -> connector.acceptor(responder).reconnect(Retry.backoff(1000, Duration.ofMillis(500)).doBeforeRetry(x->{log.info("Retry");
}).doAfterRetry(x->{log.info(x.toString());})))
.tcp("x.y.z.40", 8080);
RSocket r=requester.rsocketClient().source().block();
r.onClose().doOnError(x->{
log.info("Error");}).doFinally(x->{
log.info("Disconnected");
}).subscribe();
Upvotes: 1
Views: 591
Reputation: 12184
you may achieve the required behaviour using the following piece of code:
requester
.rsocketClient()
.source()
.flatMap(rsocket -> rsocket.onClose())
.repeat()
.retry()
.subscribe();
in the above example, once the connection is lost, the rsocket.onClose()
will be sending a terminal signal. Since you have installed the reconnect feature, the next subscription to the RSocketClient.source()
will result in the connection reestablishment. Therefore, once that happens, you will receive a new rsocket instance and will subscribe to onClose
stream again inside flatMap
operator.
To make this operation repeated, we can use .repeat
and retry
so whenever onClose
terminates (which is the indicator of disconnection) the subscription to RSocketClient.source
will be repeated and you will able to obtain a new connection and start listening to the onClose
stream again
Upvotes: 1