Reputation: 2264
I want to have an RSocket channel endpoint in my Spring Boot application in which I can handle the cancellation of the inbound, client-driven stream to do some server side cleanup.
Relevant dependencies:
I have tried to achieve my goal with both Kotlin coroutine Flows and Reactor Flux(en?). Both client/server pairs below should do the same thing: establish an RSocket channel, send 2 "ping" payloads from the client, the server responds to each with a "pong" payload, and the client closes the connection.
Flow server side:
@MessageMapping("testFlow")
fun testPingFlow(input: Flow<String>): Flow<String> {
val cs = CoroutineScope(EmptyCoroutineContext)
val output = MutableSharedFlow<String>(10)
cs.launch {
try {
input
.catch { e ->
logger.error("Rsocket server input error", e)
}
.onCompletion { exception ->
logger.debug("Rsocket server input completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket server input flow", exception)
}
}
// Normal .collect complains about being internal-only
.collectIndexed { _, message ->
logger.debug("Rsocket server input received $message")
output.emit("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
}
return output
}
Flow client side test:
@Test
fun testPingFlow() {
val outToServer = MutableSharedFlow<String>(10)
runBlocking {
val socketFlow = rSocketRequester
.route("testFlow")
.data(outToServer.asFlux())
.retrieveFlow<String>()
.take(2)
outToServer.emit("Ping ${System.currentTimeMillis()}")
outToServer.emit("Ping ${System.currentTimeMillis()}")
socketFlow
.onCompletion { exception ->
logger.debug("Rsocket client output completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket client output flow", exception)
}
}
.collect { message ->
logger.debug("Received pong from server $message")
}
}
}
Flux server side:
@MessageMapping("testFlux")
fun testPingFlux(input: Flux<String>): Flux<String> {
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux()
}
Flux client side test:
@Test
fun testPingFlux() {
val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()
rSocketRequester
.route("testFlux")
.data(outToServer.asFlux())
.retrieveFlux<String>()
.doOnCancel {
logger.debug("Rsocket client output connection completed")
}
.doOnError { e ->
logger.error("Exception received while processing Rsocket client output flow", e)
}
.take(2)
.subscribe { message ->
logger.debug("Received pong from server $message")
}
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
}
Both client/server snippets above do in fact send ping/pong payloads back and forth, but in each case I get no handling on the server side of the client cancelling the connection. I get my own log line of Rsocket client output completed
from the client side, then Operator called default onErrorDropped
from Reactor and the following stack trace from RSocket:
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
This is a problem as (beyond this toy example) my application needs to do server-side cleanup on connection close.
Thanks in advance for any help.
Upvotes: 1
Views: 1906