Kongress
Kongress

Reputation: 2264

How to handle inbound stream cancellation in Spring Boot RSocket Reactive

Goal

I want to have an RSocket channel endpoint in my Spring Boot application in which I can handle the cancellation of the inbound, client-driven stream to do some server side cleanup.

Setup

Relevant dependencies:

I have tried to achieve my goal with both Kotlin coroutine Flows and Reactor Flux(en?). Both client/server pairs below should do the same thing: establish an RSocket channel, send 2 "ping" payloads from the client, the server responds to each with a "pong" payload, and the client closes the connection.

Flow server side:

    @MessageMapping("testFlow")
    fun testPingFlow(input: Flow<String>): Flow<String> {
        val cs = CoroutineScope(EmptyCoroutineContext)
        val output = MutableSharedFlow<String>(10)

        cs.launch {
            try {
                input
                    .catch { e ->
                        logger.error("Rsocket server input error", e)
                    }
                    .onCompletion { exception ->
                        logger.debug("Rsocket server input completed")
                        if (exception != null) {
                            logger.error("Exception received while processing Rsocket server input flow", exception)
                        }
                    }
                    // Normal .collect complains about being internal-only
                    .collectIndexed { _, message ->
                        logger.debug("Rsocket server input received $message")
                        output.emit("pong ${System.currentTimeMillis()}")
                    }
            } catch (e: Throwable) {
                logger.error("Rsocket server input connection exception caught", e)
            }
        }
        return output
    }

Flow client side test:

    @Test
    fun testPingFlow() {
        val outToServer = MutableSharedFlow<String>(10)

        runBlocking {
            val socketFlow = rSocketRequester
                .route("testFlow")
                .data(outToServer.asFlux())
                .retrieveFlow<String>()
                .take(2)

            outToServer.emit("Ping ${System.currentTimeMillis()}")
            outToServer.emit("Ping ${System.currentTimeMillis()}")

            socketFlow
                .onCompletion { exception ->
                    logger.debug("Rsocket client output completed")
                    if (exception != null) {
                        logger.error("Exception received while processing Rsocket client output flow", exception)
                    }
                }
                .collect { message ->
                    logger.debug("Received pong from server $message")
                }
        }
    }

Flux server side:

    @MessageMapping("testFlux")
    fun testPingFlux(input: Flux<String>): Flux<String> {
        val output = Sinks.many().unicast().onBackpressureBuffer<String>()
        try {
            input
                .doOnNext { message ->
                    logger.debug("Rsocket server input message received $message")
                }
                .doOnError { e ->
                    logger.error("Rsocket server input connection error", e)
                }
                .doOnCancel {
                    logger.debug("Rsocket server input cancelled")
                }
                .doOnComplete {
                    logger.debug("Rsocket server input completed")
                }
                .subscribe { message ->
                    output.tryEmitNext("pong ${System.currentTimeMillis()}")
                }
        } catch (e: Throwable) {
            logger.error("Rsocket server input connection exception caught", e)
        }
        return output.asFlux()
    }

Flux client side test:

    @Test
    fun testPingFlux() {
        val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()

        rSocketRequester
            .route("testFlux")
            .data(outToServer.asFlux())
            .retrieveFlux<String>()
            .doOnCancel {
                logger.debug("Rsocket client output connection completed")
            }
            .doOnError { e ->
                logger.error("Exception received while processing Rsocket client output flow", e)
            }
            .take(2)
            .subscribe { message ->
                logger.debug("Received pong from server $message")
            }

        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
    }

The Problem

Both client/server snippets above do in fact send ping/pong payloads back and forth, but in each case I get no handling on the server side of the client cancelling the connection. I get my own log line of Rsocket client output completed from the client side, then Operator called default onErrorDropped from Reactor and the following stack trace from RSocket:

java.util.concurrent.CancellationException: Inbound has been canceled
    at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

This is a problem as (beyond this toy example) my application needs to do server-side cleanup on connection close.

Things I Have Tried Unsuccessfully

Thanks in advance for any help.

Upvotes: 1

Views: 1906

Answers (1)

Kongress
Kongress

Reputation: 2264

Bug filed, marking this question as answered. Thanks to everyone for the quick responses.

Upvotes: 0

Related Questions