Nikhil
Nikhil

Reputation: 365

Reply message received but the receiving thread has already received a reply

I have a flow that uses a pub/sub channel based gateway with 2 subscribers to send message to 2 SQS queues. In between I have chain to do message transformation. An aggregator is used to summarize the report of every gateway invocation. Happy path works fine, but when my transformer throws error I get this message Reply message received but the receiving thread has already received a reply and the aggregator gets invoked but future.get in the Runner never returns. Sample Config and code to test is as follows:

Config

<!-- Gateway to Publish Data to SQS -->
    <task:executor id="dataExecutor" pool-size="10"/>
    <int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" apply-sequence="true"/>
    <int:publish-subscribe-channel id="sqsResultChannel"/>
    <int:publish-subscribe-channel id="gatewayErrorChannel"/>
    <int:publish-subscribe-channel id="gatewayReplyChannel"/>

    <int:gateway service-interface="com.abc.DataPublishGateway" id="dataPublishGateway"
                 error-channel="gatewayErrorChannel">
        <int:method name="publishToDataService"
                    payload-expression="#args[0]"
                    request-channel="dataChannel"
                    reply-channel="gatewayReplyChannel">
        </int:method>
    </int:gateway>

    <int:chain input-channel="gatewayErrorChannel" output-channel="sqsResultChannel">
        <int:header-enricher>
            <int:correlation-id expression="payload.failedMessage.headers.correlationId" />
            <int:header name="sequenceSize" expression="payload.failedMessage.headers.sequenceSize" />
            <int:header name="sequenceNumber" expression="payload.failedMessage.headers.sequenceNumber" />
        </int:header-enricher>
    </int:chain>


    <!-- Route to system-a SQS -->
    <int:channel id="sqsSystemAPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemAPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-a" />
        </int:header-enricher>
        <int:transformer expression="payload/2"/> <!-- Simulate transformer error -->
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="a-queue"
                                          channel="sqsSystemAPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>

    <!-- Route to system-b SQS -->
    <int:channel id="sqsSystemBPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemBPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-b" />
        </int:header-enricher>
        <int:transformer expression="payload.toLowerCase() "/>
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="b-queue"
                                          channel="sqsSystemBPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>


    <bean class="com.abc.DataResponseAggregator" id="responseAggregator" />
    <int:aggregator input-channel="sqsResultChannel" output-channel="gatewayReplyChannel" ref="responseAggregator"/>

    <!-- Generic Error Channel Logger -->
    <int:logging-channel-adapter log-full-message="true"
                                 logger-name="errorLogger"
                                 level="ERROR"
                                 channel="errorChannel"
                                 id="globalErrorLoggingAdapter"/>

Aggregator

public class DataResponseAggregator {

    public Map<String, String> aggregate(List<Message> responses) {

        Map<String, String> resultMap = new HashMap<>();

        responses.forEach(message -> {
            if (message instanceof ErrorMessage) {
                String exceptionMessage = ((ErrorMessage) message).getPayload().getCause().getMessage();
                String targetSystem = ((MessagingException) message.getPayload()).getFailedMessage().getHeaders()
                        .get("targetSystem").toString();
                resultMap.put(targetSystem, exceptionMessage);
            }
            else {

                String targetSystem = message.getHeaders().get("targetSystem").toString();
                resultMap.put(targetSystem, "Ack -> " + message.getHeaders().get("aws_messageId").toString());
            }
        });
        return resultMap;
    }
}

Gateway

public interface DataPublishGateway {

    Future<Map<String, String>> publishToDataService(String message);
}

Runner

@Bean
    CommandLineRunner runner(DataPublishGateway dataPublishGateway) {
        return args -> {

            String[] messages = new String[]{"Message 1", "Message 2"};

            List<Future<Map<String, String>>> futureList = new ArrayList<>();
            Arrays.stream(messages).forEach(s -> {
                futureList.add(dataPublishGateway.publishToDataService(s));
            });
            System.out.println("Processing Futures and Printing Results...");
            futureList.forEach(mapFuture -> {
                try {
                    mapFuture.get().entrySet().forEach(entry -> {
                        System.out.println(entry.getKey() + " - " + entry.getValue());
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

        };
    }

Logs

14:47:21.650  INFO 91449 --- [pool-1-thread-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-09-07 14:47:21.651  INFO 91449 --- [pool-1-thread-2] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-09-07 14:47:21.655  WARN 91449 --- [pool-1-thread-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply: GenericMessage [payload={system-a=Expression evaluation failed: payload/2; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1030E: The operator 'DIVIDE' is not supported between objects of type 'java.lang.String' and 'java.lang.Integer', system-b=Ack -> 90b62dff-f3c3-4288-b5e3-8178e410f60d}, headers={aws_messageId=90b62dff-f3c3-4288-b5e3-8178e410f60d, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b519afe, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b519afe, sequenceSize=2, correlationId=dc914588-f9a9-537f-0623-94938f84cec4, aws_serviceResult={MD5OfMessageBody: 83b2330607fe8f817ce6d24249dea373,MD5OfMessageAttributes: 5f1f442c363809afbd334ff00232c834,MessageId: 90b62dff-f3c3-4288-b5e3-8178e410f60d,}, id=9a6ad209-5291-6216-100b-502b4c37eb01, targetSystem=system-b, timestamp=1599482841654}]
2020-09-07 14:47:21.655  WARN 91449 --- [pool-1-thread-1] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply: GenericMessage [payload={system-a=Expression evaluation failed: payload/2; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1030E: The operator 'DIVIDE' is not supported between objects of type 'java.lang.String' and 'java.lang.Integer', system-b=Ack -> 3c8a4479-22e1-48d8-aca6-a86c252e90c1}, headers={aws_messageId=3c8a4479-22e1-48d8-aca6-a86c252e90c1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@f5a99c3, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@f5a99c3, sequenceSize=2, correlationId=74ba49e0-fef8-02e6-bc4b-b2ccdd7cd13b, aws_serviceResult={MD5OfMessageBody: 1db65a6a0a818fd39655b95e33ada11d,MD5OfMessageAttributes: cd4eae624b7c115f948034aafaba01bc,MessageId: 3c8a4479-22e1-48d8-aca6-a86c252e90c1,}, id=3e3de3f3-e8b0-470e-e62f-31815e662857, targetSystem=system-b, timestamp=1599482841654}]

What am I missing? I would expect the error from the transformer to follow the flow : gatewayErrorChannel -> sqsResultChannel -> aggregator -> gatewayReplyChannel which it does since the aggregator output appears in the WARN message. But why does future.get never return and also it seems like gatewayReplyChannel receives aggregator output twice?

Upvotes: 0

Views: 1603

Answers (1)

Nikhil
Nikhil

Reputation: 365

With some logging and most importantly the understanding of error and reply channels set on Gateway messages from this post, I am able to solve the issue.

What was the issue here?

  1. The errorChannel was not set on the messages coming into the chain.
  2. In case when both chains throw exceptions, the error messages coming to the aggregator did not have a reply channel set on them to return the aggregated message to the gateway's reply channel. The final config looks as below:
<!-- Gateway to Publish Data to SQS -->
    <task:executor id="dataExecutor" pool-size="10"/>
    <int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" apply-sequence="true"/>
    <int:publish-subscribe-channel id="sqsResultChannel"/>
    <int:publish-subscribe-channel id="gatewayErrorChannel"/>
    <int:publish-subscribe-channel id="gatewayReplyChannel"/>

    <int:gateway service-interface="com.abc.DataPublishGateway" id="dataPublishGateway"
                 error-channel="gatewayErrorChannel">
        <int:method name="publishToDataService"
                    payload-expression="#args[0]"
                    request-channel="dataChannel"
                    reply-channel="gatewayReplyChannel">
        </int:method>
    </int:gateway>

    <int:chain input-channel="gatewayErrorChannel" output-channel="sqsResultChannel">
        <int:header-enricher>
            <int:correlation-id expression="payload.failedMessage.headers.correlationId" />
            <int:header name="sequenceSize" expression="payload.failedMessage.headers.sequenceSize" />
            <int:header name="sequenceNumber" expression="payload.failedMessage.headers.sequenceNumber" />
            <int:reply-channel expression="payload.failedMessage.headers.replyChannel" />
        </int:header-enricher>
    </int:chain>


    <!-- Route to system-a SQS -->
    <int:channel id="sqsSystemAPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemAPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-a" />
            <int:error-channel ref="gatewayErrorChannel" overwrite="true" />
        </int:header-enricher>
        <int:transformer expression="payload/2"/> <!-- Simulate transformer error -->
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="a-queue"
                                          channel="sqsSystemAPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>

    <!-- Route to system-b SQS -->
    <int:channel id="sqsSystemBPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemBPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-b" />
            <int:error-channel ref="gatewayErrorChannel" overwrite="true" />
        </int:header-enricher>
        <int:transformer expression="payload.toLowerCase() "/>
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="b-queue"
                                          channel="sqsSystemBPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>


    <bean class="com.abc.DataResponseAggregator" id="responseAggregator" />
    <int:aggregator input-channel="sqsResultChannel" output-channel="gatewayReplyChannel" ref="responseAggregator"/>

    <!-- Generic Error Channel Logger -->
    <int:logging-channel-adapter log-full-message="true"
                                 logger-name="errorLogger"
                                 level="ERROR"
                                 channel="errorChannel"
                                 id="globalErrorLoggingAdapter"/>

Upvotes: 1

Related Questions