Reputation: 111
I need to access a websocket-service which closes an open websocket-connection after 24h. How do I have to implement the reconnect with Spring-Boot 2 and Webflux?
This is what I have so far (taken from https://github.com/artembilan/webflux-websocket-demo):
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {
ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono = client.execute(new URI("ws://localhost:8080/echo"),
session -> session.receive()
.timeout(Duration.ofSeconds(3))
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
}
As soon as the connection gets lost (3 seconds no input anymore), a TimeoutException is thrown. But how can I reconnect the socket?
Upvotes: 5
Views: 4896
Reputation: 33
I did something by using UnicastProcessor
of reactor.
...
public abstract class AbstractWsReconnectClient {
private Logger ...
protected UnicastProcessor<AbstractWsReconnectClient> reconnectProcessor = UnicastProcessor.create();
protected AbstractWsReconnectClient(Duration reconnectDuration) {
reconnect(reconnectDuration);
}
public abstract Mono<Void> connect();
private void reconnect(Duration duration) {
reconnectProcessor.publish()
.autoConnect()
.delayElements(duration)
.flatMap(AbstractWsReconnectClient::connect)
.onErrorContinue(throwable -> true,
(throwable, o) -> {
if (throwable instanceof ConnectException) {
logger.warn(throwable.getMessage());
} else {
logger.error("unexpected error occur during websocket reconnect");
logger.error(throwable.getMessage());
}
})
.doOnTerminate(() -> logger.error("websocket reconnect processor terminate "))
.subscribe();
}
}
When the WebSocketClient
is terminate, invoke UnicastProcessor.onNext
public Mono<Void> connect() {
WebSocketClient client = new ReactorNettyWebSocketClient();
logger.info("trying to connect to sso server {}", uri.toString());
return client.execute(uri, headers, ssoClientHandler)
.doOnTerminate(() -> {
logger.warn("sso server {} disconnect", uri.toString());
super.reconnectProcessor.onNext(this);
});
}
Upvotes: 2
Reputation: 2208
There is no out-of-the-box solution, reconnection mechanism is not part of JSR 356 - Java API for WebSocket. But you can implement it on your own - a simple example with Spring events:
Step 1 - Create an event class.
public class ReconnectionEvent extends ApplicationEvent {
private String url;
public ReconnectionEvent(String url) {
super(url);
this.url = url;
}
public String getUrl() {
return url;
}
}
Step 2 - Provide a method for websocket connection. An example:
...
@Autowired
private ApplicationEventPublisher publisher;
...
public void connect(String url) {
ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono = client.execute(URI.create(url),
session -> session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log()
.subscribeWith(output)
.doOnTerminate(() -> publisher.publishEvent(new ReconnectEvent(url)))
.then());
output
.doOnSubscribe(s -> sessionMono.subscribe())
.subscribe();
}
Check doOnTerminate()
method - when the Flux terminates, either by completing successfully or with an error, it emits a ReconnectEvent. If necessary, you can emit the reconnection event on other Flux's callbacks (for example only on doOnError()
).
Step 3 - Provide a listener, that connects again on given url when a reconnection event occures.
@EventListener(ReconnectEvent.class)
public void onApplicationEvent(ReconnectEvent event) {
connect(event.getUrl());
}
Upvotes: 5