ioreskovic
ioreskovic

Reputation: 5699

Spring Integration flow loses subscriber

I have SI flow which consumes org.springframework.web.reactive.socket.WebSocketMessage, does some work with it, which includes handling it's payload using Netty's ByteBuf. At some point, an exception occurred in my flow:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [_org.springframework.integration.errorLogger.handler]; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
...
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161]
    at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161]
    at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]

After that, handling all binary web socket messages fails with the following exception:

2018-11-26T10:38:29,133 ERROR --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'binaryWebSocketMessageChannel'; nested exception is java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
    at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79)
...
Caused by: java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages
    at org.springframework.util.Assert.state(Assert.java:94)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    ... 57 more

Could anyone point me in any direction to try and solve the problem? As well, in which cases would SI EIP component (router, transformer, filter, service activator) to unsubscribe from the channel?

For reference, channel type is org.springframework.integration.channel.FluxMessageChannel

Edit:

My flow looks like this:

WebSocketMessage -> router: (BINARY)  -> binaryWebSocketMessageChannel -> ...
                            (!BINARY) -> nullChannel

(I know filter fits better here, I plan to refactor later)

@ArtemBilan the repo with example is here: https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber

Upvotes: 2

Views: 1402

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

The point is that Publisher in the FluxMessageChannel is cancelled in that Spring Integration version.

We started to use onErrorContinue() from the Reactor 3.2 in version 5.1. To fix your problem it would be better to consider to upgrade your application to the latest Spring Boot 2.1.1.

As we workaround you can consider to swallow an exception in the BinaryWsmToBytesTransformer and don't bubble it into the FluxMessageChannel back.

Upvotes: 2

Related Questions