theone1one
theone1one

Reputation: 1520

Error: "Only one connection receive subscriber allowed" for POST method, XML Request

I am writing a simple post method to post XML body to a url. When I call post method I get "Only one connection receive subscriber allowed" error message.

Below is the service class that I am using to call publish method.

@Service
public class Service {

    WebClient client = WebClient.create();

    private String publishEndpoint = "xyz.com";

    public Mono<Object> publish() {
        String body = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"
                + "<dealerReview xmlns=\"http://api.cars.com/schemas/dealer_review.xsd\">\n"
                + "    <customerIdentifier>9240112369283</customerIdentifier>\n" + "    <consumerDealerReview>\n"
                + "        <consumerDealerReviewIdentifier>0</consumerDealerReviewIdentifier>\n"
                + "        <visitReason>\n" + "            <id>0</id>\n" + "            <code>TestingReason</code>\n"
                + "        </visitReason>\n" + "        <title>Test comment</title>\n" + "            <rating>\n"
                + "                <id>\n" + "                    <code>OverallFacilities</code>\n"
                + "                    <ratingNumber>4.0</ratingNumber>\n" + "                </id>\n"
                + "            </rating>\n" + "        </categoryRatings>\n"
                + "        <ipAddress>127.0.0.1</ipAddress>\n" + "        <affiliate>\n"
                + "            <name>yahoo</name>\n" + "        </affiliate>\n" + "    </consumerDealerReview>\n"
                + "</dealerReview>";

        return client.post().uri(publishEndpoint).header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
                .body(BodyInserters.fromObject(body)).retrieve().bodyToMono(Object.class);
    }
}

Below is the stack trace that I am getting.

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:277) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:127) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:290) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1521) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:235) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1521) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:235) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxError.subscribe(FluxError.java:43) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7734) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:157) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:180) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:277) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:127) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:290) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:511) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1432) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1199) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1243) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:541) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:396) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:189) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:267) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) [netty-common-4.1.31.Final.jar:4.1.31.Final]

I tried doing this Spring WebFlux: Only one connection receive subscriber allowed, but did not work.

Here is the HTTP response:

{ "timestamp": "2019-01-31T06:37:14.306+0000", 
  "path": "/cars/publishReview", "status": 500, 
  "error": "Internal Server Error", "message": "401 Unauthorized" }

Just for information if it helps, GET method is working fine.

Upvotes: 2

Views: 3128

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59191

Judging from your WebClient call and your HTTP response, your application is probably using a version prior to Spring Framework 5.1.4.RELEASE and you might be hitting this issue. This only applies if the server is responding with a 4xx / 5xx response status.

Your request is providing a String body and the XML Content-Type header. This looks valid, even if you could provide an object directly and rely on JAXB for XML serialization.

In this case, your request is probably missing some sort of credentials, as the server is responding with HTTP 401 (but actually with a 500 status, which is strange). You should first fix the request to make it valid. I'd argue that this is orthogonal to WebClient and you could craft that request with curl as a first step.

Now once the request part is fixed and the server will reply as expected, the .bodyToMono(Object.class) part of your client call seems suspicious. At that point you'll probably hit a deserialization issue. Depending on the expected response format, you should choose a different type for deserialization and maybe provide an "Accept" header in your request to tell the server what you're expecting for Content-Type.

Upvotes: 1

Related Questions