Reputation: 23
I'm playing with reactive websocket (on spring boot 2.1.0) but I have an issue in trying to block the out flux waiting for client information.
I know that blocking is not the correct way to handle the reactive flow but I need to receive some information from the client before proceed (ie: an auth key, an id), there is an acceptable way to manage this in a reactive manner?
For example: I want the client to send an authorization key and a subscription id (to subscribe only to a specific event) and I will send the out flux only when I have both informations or close the session if the informations are not valid
I've tried to manage the check inside the handle method
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
But this way is not working and is not correct because is managing the session kill in an "async" way and anyway also when receiving a message with the wrong key the session still remain alive
Another way is to keep a "session" using an in memory storage to keep track of the information received and handle this at business logic level
I'm stuck in figuring a correct way to manage it in a reactive way
My starting point was this example: http://www.baeldung.com/spring-5-reactive-websockets
Thanks in advance
ADDITIONAL INFORMATIONS:
Using the example https://github.com/eugenp/tutorials/tree/master/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket
I've created a basic spring boot application and I've added a configuration class for the websocket:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", new MyWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1); // before annotated controllers
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
The following is the main class that is containing the main websocket method (modified with my actual code):
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
private WebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
// log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
Flux<String> intervalFlux = Flux
.interval(Duration.ofMillis(500))
.map(tick -> {
if (tick < data.size())
return "item " + tick + ": " + data.get(tick.intValue());
return "Done (tick == data.size())";
});
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage));
}
}
Upvotes: 2
Views: 2191
Reputation: 59086
You should not subscribe
nor block
within a reactive pipeline - you can spot you're within such a pipeline because the return type of the handler method is a Mono<Void>
, meaning a signal that the handling of the incoming messages is done.
In your case, you probably want to read the first message, check that it contains the subscription info you're expecting, and send messages.
public class TestWebSocketHandler implements WebSocketHandler {
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(msg -> {
SubscriptionInfo info = extract(msg);
if (info == null) {
return session.close();
}
else {
Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
return session.send(message);
}
})
.then();
}
SubscriptionInfo extract(String message) {
//
}
class SubscriptionInfo {
//
}
}
Upvotes: 1