Reputation: 308
I am new to Rsocket and reactive Websocket and I develop an application where I need multiple channels, where my clients can subscribe. Until now I tried 2 solutions but unfortunately non of them workd as I expected.
I implemented a reactive Websocket server with Spring where I stored every client connection in a FluxSink list, but unfortunately with that soultion I couldnt manage to implement channels, and that is the reason I tried RSocket. My first question is it possible to implement channels with this workflow? Do I understend correctly only one websocket connection should be open between one client and the server? Because that is the reason I spend so mutch time trying to figure out how to create channels.
With my fist soulion I could establish connection between my server and my client and they could communicate but unfortunately I couldt create channels. So I tried RSocket because I read that with this protocol it is quite easy to create bidirectional channels, but unfortunately I cannot connect to them
I have a Controller with this @MessageMapping
@MessageMapping("my-channel")
public Flux<String> channel(final Flux<String> message) {
log.info("Received stream-stream (channel) request... ");
return message
.doOnNext(m -> log.info("Request: ", m))
.doOnCancel(() -> log.warn("Client cancelled the channel"))
.switchMap(setting -> Flux.interval(Duration.ofSeconds(10)).map(index -> "Love spring if it works"))
.log()
;
}
When I try to connect with rsc client everything works fine.
./rsc --debug --channel --data=asd --route=my-channel --stacktrace ws://localhost:7000
But unfortunately when I try to connect from my react native client the Destination route is alwayes ''. Of course if I change @MessageMapping("my-channel") -> @MessageMapping("") it works perfectly but for obvious reasions this solution is not appropriate for me.
const connector = new RSocketConnector({
setup: {
keepAlive: 100,
lifetime: 10000,
metadataMimeType: "application/json",
dataMimeType: "message/x.rsocket.routing.v0"
},
transport: new WebsocketClientTransport({
url: "ws://192.168.0.107:7000",
wsCreator: (url) => new WebSocket(url) as any,
}),
});
const rsocket = await connector.connect();
await new Promise((resolve, reject) => {
const requester = rsocket.requestChannel(
{
data: undefined,
metadata: Buffer.concat([
Buffer.from(String.fromCharCode('my-channel'.length)),
Buffer.from('my-channel'),
]),
},
1,
false,
{
onError: (e) => reject(e),
onNext: (payload, isComplete) => {
console.log(
`payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}`
);
requester.request(1);
if (isComplete) {
resolve(payload);
}
},
onComplete: () => {
resolve(null);
},
onExtension: () => { },
request: (n) => {
console.log(`request(${n})`);
requester.onNext(
{
data: Buffer.from("Message"),
},
true
);
},
cancel: () => { },
}
);
}).catch(e => {
console.log("-------------ERRORRRRRR-------------")
console.error(e);
});
The error message is this:
-------------ERRORRRRRR-------------
ERROR [Error: Destination '' does not support REQUEST_CHANNEL. Supported interaction(s): [METADATA_PUSH, SETUP]]
What am I doing wrong?
And a last question how can I send additional header when I establish the connection?
Thank you for your help in advance.
Upvotes: 0
Views: 442
Reputation: 308
With the help of OlegDokuka in this discussion I could manage the issue. The only problem was as he mentioned.
Actual:
metadataMimeType: "application/json",
dataMimeType: "message/x.rsocket.routing.v0"
Have to be
dataMimeType: "application/json",
metadataMimeType: "message/x.rsocket.routing.v0"
Upvotes: 0