Reputation: 35276
Suppose I have this simple Websocket handler for chat messages:
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession
.receive()
.map(webSocketMessage -> webSocketMessage.getPayloadAsText())
.map(textMessage -> textMessageToGreeting(textMessage))
.doOnNext(greeting-> greetingPublisher.push(greeting))
.subscribe();
final Flux<WebSocketMessage> message = publisher
.map(greet -> processGreeting(webSocketSession, greet));
return webSocketSession.send(message);
}
What is needed to be done here in general as such it will use the rsocket protocol?
Upvotes: 3
Views: 386
Reputation: 1776
RSocket controller in the Spring WebFlux looks more like a RestController than WebSocketHandler. So the example above is simple like that:
@Controller
public class RSocketController {
@MessageMapping("say.hello")
public Mono<String> saHello(String name) {
return Mono.just("server says hello " + name);
}
}
and this is equivalent to requestResponse
method.
If this answer doesn't satisfy you, please describe more what you want to achieve.
EDIT
If you want to broadcast messages to all clients, they need to subscribe to the same Flux.
public class GreetingPublisher {
final FluxProcessor processor;
final FluxSink sink;
public GreetingPublisher() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = processor.sink();
}
public void addGreetings(String greeting) {
this.sink.next(greeting);
}
public Flux<String> greetings() {
return processor;
}
}
@Controller
public class GreetingController{
final GreetingPublisher greetingPublisher = new GreetingPublisher();
@MessageMapping("greetings.add")
public void addGreetings(String name) {
greetingPublisher.addGreetings("Hello, " + name);
}
@MessageMapping("greetings")
public Flux<String> sayHello() {
return greetingPublisher.greetings();
}
}
Your clients have to call the greetings
endpoint with the requestStream
method. Wherever you send the message with the greetingPublisher.addGreetings()
it's going to be broadcasted to all clients.
Upvotes: 2