Mauro de Palma
Mauro de Palma

Reputation: 62

Websocket communication versus Netty Environment

I need to implement the communication between two Java environments. The recipient is a SpringBoot reactive application and the snippet to handle the communication is as follows (I'll skip the configuration of the beans)

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive() // <- Step 0
        .map(message -> {
            log.info("Step 1");
            return message.getPayloadAsText();
        })
        .map(message -> {
            log.info("Step 2");
            return webSocketSession.textMessage(this.receiveMessage(message));
        }));
}

The client part is implemented using the Http API from java 11

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    }).join();


webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));

Using debug, I can notice that once the join() is completed and the WebSocket instance is returned, the method at Step 0 of receiver is executed and the Mono<Void> instance is returned.

But the problem is that even if I send some text, Steps 1 and 2 are never executed!

If I try the reverse communication (sending something from the SpringBoot application to the Sender app) the messages are received.

Finally, this is the log from the onClose callback execution after the sendClose statement.

Closed with status 1002, reason: Server internal error

Upvotes: 1

Views: 307

Answers (1)

Mauro de Palma
Mauro de Palma

Reputation: 62

SOLUTION

Since buildAsync method return an instance of CompletableFuture<WebSocket>, we need the chain the send of the messages before flush the messages queue using join()

Here the solution

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    })
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
    .thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
    .join();

Upvotes: 1

Related Questions