Ashok.N
Ashok.N

Reputation: 1391

Aggregator is failing after upgrading to Spring-Integration 4

We are using Spring-Integration in our project. We are trying to migrate from spring-integration-core:jar:3.0.1.RELEASE to spring-integration-core:jar:4.3.2.RELEASE, java 8, spring 4. We are running into issues with aggregator. Weirdly, the aggregator's method is not called during the execution. The configuration is shown below:

        <!-- Store the original payload in header for future purpose -->
    <int:header-enricher default-overwrite="true"  should-skip-nulls="true"  >
        <int:header name="${headerNames.originalPayload}" expression="payload" />
    </int:header-enricher>  

    <!-- split the issues-->
    <int-xml:xpath-splitter  >
    <int-xml:xpath-expression expression="//transaction"/>
    </int-xml:xpath-splitter>

    <int:service-activator  ref="httpOutboundGatewayHandler" method="buildHttpOutboundGatewayRequest" />

    <int:header-filter  header-names="accept-encoding"/>    

    <int-http:outbound-gateway  url-expression="headers.restResourceUrl"
                                http-method-expression="headers.httpMethod"
                                extract-request-payload="true"
                                expected-response-type="java.lang.String">
    </int-http:outbound-gateway>

    <int:service-activator ref="msgHandler" method="buildMessageFromExtSysResponse" />   


    <int-xml:xslt-transformer xsl-resource="${stylesheet.PQGetWorklist-Response-MoveSources}"  />
</int:chain>    

    <int:aggregator input-channel="PQGetWorklist-Aggregate-sources" output-channel="PQGetWorklist-MoveSourcesUnderIssues"  
                    ref="xmlAggregator"  method="aggregateSources">      
    </int:aggregator>   

In the above code, <int-http:outbound-gateway is getting executed, but the XmlAggregator.aggregateSources is not called for unknown reasons. I could see that the message is sent to on the channel PQGetWorklist-Aggregate-sources. But from there the aggregator's method aggregateSources is not called. As a result, we are getting No reply received within timeout. Remember the same configuration is working fine with spring-integration-core:jar:3.0.1.RELEASE. The problem is seen only when we upgrade it to spring-integration-core:jar:4.3.2.RELEASE . Here is my XmlAggregator.java

public class XmlAggregator {

    private static final Logger logger = Logger.getLogger(XmlAggregator.class);
    public Message aggregateSources(List < Message > messages) throws DocumentException {
     Document mainDom = XmlParserUtil.convertString2Document("<Results> </Results>");
     Document splitMessageDom = XmlParserUtil.convertString2Document(messages.get(0).getPayload().toString());
     Document IssuesDom = XmlParserUtil.convertString2Document("<Issues> </Issues>");
     Document sourcesDom = XmlParserUtil.convertString2Document("<RetrievedSources> </RetrievedSources>");
     if(messages.get(0).getHeaders().get("jobDesignerJobName").equals("PQIssueInquiry")){
     //extract callerType node
     Element callerType = XmlParserUtil.getXmlElements(XmlParserUtil.convertString2Document(messages.get(0).getPayload().toString()), "//callerType").get(0);
     //add callerType to root node
     mainDom.getRootElement().content().add(callerType);
     }
     //extract sort node
     Element sort = XmlParserUtil.getXmlElements(XmlParserUtil.convertString2Document(messages.get(0).getPayload().toString()), "//sort").get(0);
     //add sort to root node
     mainDom.getRootElement().content().add(sort);

     //get all the issues 
     List < Element > transactionElements = XmlParserUtil.getXmlElements(splitMessageDom, "//transaction");
     for (Element issue: transactionElements) {
      // add all the issues to the IssuesDom
      IssuesDom.getRootElement().content().add(issue);
     }
     //add all the issues to the root node
     XmlParserUtil.appendChild(mainDom, IssuesDom, null);
     for (Message source: messages) {

      Document sourcesTempDom = XmlParserUtil.convertString2Document(source.getPayload().toString());
      Reader xml = new StringReader((String) source.getPayload());
      SAXReader reader = new SAXReader();
      Document document = reader.read(xml);
      //get all the sources
      List < Element > sourceElements = XmlParserUtil.getXmlElements(sourcesTempDom, "//sources");

      for (Element sources: sourceElements) {
       //add all the sources to sourcesDom 
       sourcesDom.getRootElement().content().add(sources);
      }

     }

     // add all the sources to the root node
     XmlParserUtil.appendChild(mainDom, sourcesDom, null);

     MessageBuilder < ? > msgBuilder = MessageBuilder.withPayload(mainDom.asXML());
     Message message = msgBuilder.build();
     logger.debug("aggregateSources Results after aggregation " + mainDom.asXML());
     return message;
    }

}

Any thoughts?

Upvotes: 1

Views: 245

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

Starting with version 4.2 the XPathMessageSplitter is based on the iterator functionality by default:

<xsd:attribute name="iterator" default="true">
    <xsd:annotation>
        <xsd:documentation>
            The iterator mode: 'true' (default) to return an 'java.util.Iterator'
            for splitting 'payload', 'false to return a 'java.util.List'.
            Note: the 'list' contains transformed nodes whereas with the
            'iterator' each node is transformed while iterating.
        </xsd:documentation>
    </xsd:annotation>
    <xsd:simpleType>
        <xsd:union memberTypes="xsd:boolean xsd:string" />
    </xsd:simpleType>
</xsd:attribute>

Do not overhead the memory with the size interest and pull the data from the target source on demand.

The functionality can be turned off via iterator="false" option.

Upvotes: 1

Related Questions