CodeNinja
CodeNinja

Reputation: 23

Spring Boot Reactive Websoket - Block out flux until received all information from client

I'm playing with reactive websocket (on spring boot 2.1.0) but I have an issue in trying to block the out flux waiting for client information.

I know that blocking is not the correct way to handle the reactive flow but I need to receive some information from the client before proceed (ie: an auth key, an id), there is an acceptable way to manage this in a reactive manner?

For example: I want the client to send an authorization key and a subscription id (to subscribe only to a specific event) and I will send the out flux only when I have both informations or close the session if the informations are not valid

I've tried to manage the check inside the handle method

     webSocketSession.receive().subscribe(inMsg -> {

            if(!inMsg.getPayloadAsText().equals("test-key")) {
                log.info("AUTHORIZATION ERROR");
                webSocketSession.close();
            }
        });

But this way is not working and is not correct because is managing the session kill in an "async" way and anyway also when receiving a message with the wrong key the session still remain alive

Another way is to keep a "session" using an in memory storage to keep track of the information received and handle this at business logic level

I'm stuck in figuring a correct way to manage it in a reactive way

My starting point was this example: http://www.baeldung.com/spring-5-reactive-websockets

Thanks in advance

ADDITIONAL INFORMATIONS:

Using the example https://github.com/eugenp/tutorials/tree/master/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket

I've created a basic spring boot application and I've added a configuration class for the websocket:

@Configuration 
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", new MyWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // before annotated controllers
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

The following is the main class that is containing the main websocket method (modified with my actual code):

@Component
public class MyWebSocketHandler  implements WebSocketHandler {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", webSocketHandler);

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {

       webSocketSession.receive().subscribe(inMsg -> {

        if(!inMsg.getPayloadAsText().equals("test-key")) {
            // log.info("AUTHORIZATION ERROR");
            webSocketSession.close();
        }
    });

    List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
    Flux<String> intervalFlux = Flux
                                .interval(Duration.ofMillis(500))
                                .map(tick -> {
                                    if (tick < data.size())
                                        return "item " + tick + ": " + data.get(tick.intValue());
                                    return "Done (tick == data.size())";
                                });

        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage));
    }


}

Upvotes: 2

Views: 2191

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59086

You should not subscribe nor block within a reactive pipeline - you can spot you're within such a pipeline because the return type of the handler method is a Mono<Void>, meaning a signal that the handling of the incoming messages is done.

In your case, you probably want to read the first message, check that it contains the subscription info you're expecting, and send messages.

public class TestWebSocketHandler implements WebSocketHandler {

    public Mono<Void> handle(WebSocketSession session) {

        return session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(msg -> {
                    SubscriptionInfo info = extract(msg);
                    if (info == null) {
                        return session.close();
                    }
                    else {
                        Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
                        return session.send(message);
                    }
                })
                .then();
    }

    SubscriptionInfo extract(String message) {
        //
    }

    class SubscriptionInfo {
        //
    }
}

Upvotes: 1

Related Questions