Reputation: 62
I need to implement the communication between two Java environments. The recipient is a SpringBoot reactive application and the snippet to handle the communication is as follows (I'll skip the configuration of the beans)
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(webSocketSession.receive() // <- Step 0
.map(message -> {
log.info("Step 1");
return message.getPayloadAsText();
})
.map(message -> {
log.info("Step 2");
return webSocketSession.textMessage(this.receiveMessage(message));
}));
}
The client part is implemented using the Http API from java 11
WebSocket webSocket = HttpClient
.newBuilder().executor(executor).build()
.newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
log.info("onText received with data " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
log.info("Closed with status " + statusCode + ", reason: " + reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
log.error("Error: " + error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}
}).join();
webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));
Using debug, I can notice that once the join()
is completed and the WebSocket
instance is returned, the method at Step 0 of receiver is executed and the Mono<Void>
instance is returned.
But the problem is that even if I send some text, Steps 1 and 2 are never executed!
If I try the reverse communication (sending something from the SpringBoot application to the Sender app) the messages are received.
Finally, this is the log from the onClose
callback execution after the sendClose
statement.
Closed with status 1002, reason: Server internal error
Upvotes: 1
Views: 307
Reputation: 62
SOLUTION
Since buildAsync
method return an instance of CompletableFuture<WebSocket>
, we need the chain the send of the messages before flush the messages queue using join()
Here the solution
WebSocket webSocket = HttpClient
.newBuilder().executor(executor).build()
.newWebSocketBuilder()
.buildAsync(URI.create(url), new WebSocket.Listener() {
@Override
public void onOpen(WebSocket webSocket) {
log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
log.info("onText received with data " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
log.info("Closed with status " + statusCode + ", reason: " + reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
log.error("Error: " + error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}
})
.thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
.thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
.thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
.join();
Upvotes: 1