Reputation: 5699
I have SI flow which consumes org.springframework.web.reactive.socket.WebSocketMessage
, does some work with it, which includes handling it's payload using Netty's ByteBuf
. At some point, an exception occurred in my flow:
org.springframework.messaging.MessageHandlingException: error occurred in message handler [_org.springframework.integration.errorLogger.handler]; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0 at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] ... Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161] at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161] at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]
After that, handling all binary web socket messages fails with the following exception:
2018-11-26T10:38:29,133 ERROR --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'binaryWebSocketMessageChannel'; nested exception is java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205) at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55) at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138) at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127) at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158) at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79) ... Caused by: java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages at org.springframework.util.Assert.state(Assert.java:94) at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ... 57 more
Could anyone point me in any direction to try and solve the problem? As well, in which cases would SI EIP component (router, transformer, filter, service activator) to unsubscribe from the channel?
For reference, channel type is org.springframework.integration.channel.FluxMessageChannel
Edit:
My flow looks like this:
WebSocketMessage -> router: (BINARY) -> binaryWebSocketMessageChannel -> ...
(!BINARY) -> nullChannel
(I know filter fits better here, I plan to refactor later)
@ArtemBilan the repo with example is here: https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber
Upvotes: 2
Views: 1402
Reputation: 121177
The point is that Publisher
in the FluxMessageChannel
is cancelled in that Spring Integration version.
We started to use onErrorContinue()
from the Reactor 3.2
in version 5.1
. To fix your problem it would be better to consider to upgrade your application to the latest Spring Boot 2.1.1
.
As we workaround you can consider to swallow an exception in the BinaryWsmToBytesTransformer
and don't bubble it into the FluxMessageChannel
back.
Upvotes: 2