rupweb
rupweb

Reputation: 3328

with spring boot rsocket capture the cancel frame type

I have a spring boot rsocket implementation where if a client cancels or closes their rsocket request then I want to cancel other subscription registrations on the server.

In the logs on the spring boot server I can see that a cancel message is sent or received:

WARN i.r.t.n.s.WebsocketServerTransport$1 [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

How do I capture and handle this cancel signal?

I tried cancel endpoints but these don't capture the signal:

@MessageMapping("cancel")
Flux<Object> onCancel() {
    log.info("Captured cancel signal");
}

or

@ConnectMapping("cancel")
Flux<Object> onCancel2() {
    log.info("Captured cancel2 signal");
}

This question on cancel subscriptions is possibly related, and this question on detecting websocket disconnection

Upvotes: 1

Views: 628

Answers (2)

rupweb
rupweb

Reputation: 3328

It was not a well set out question. The answer is that

INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

is seen by a FluxSink that was setup from the original @MessageMapping endpoint.

For example:

@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {       
    return myService.generateWorld(message);
}

In myService class

public Flux<Object> generateWorld(String hello) {
    EmitterProcessor<Object> emitter = EmitterProcessor.create();
    FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

    // doing stuff with sink here
    sink.next(stuff());

    // This part will handle a cancel from the client
    sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});

    return Flux.from(emitter));  
}

The sink.onCancel() will handle a cancel of the flux to the hello endpoint, from the client.

Upvotes: 1

kojot
kojot

Reputation: 1776

To capture the cancel signal you can use subscribe to onClose() event.

In your controller

@Controller
class RSocketConnectionController {

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientId)

                    //TODO here whatever you want
                })
    }
}

Your client needs to send the SETUP frame properly to invoke this @ConnectMapping. If you use rsocket-js you need to add a payload like this:

const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {
          //for connection mapping on server
          payload: {
            data: 'unique-client-id',   //TODO you can receive this data on server side
            metadata: String.fromCharCode("client-id".length) + "client-id"
          },
          // ms btw sending keepalive to server
          keepAlive: 60000,
.....
        }
});

Upvotes: 1

Related Questions