Vikas Sharma
Vikas Sharma

Reputation: 1245

Spring Integration: Aggregator is not aggregating splitted messages into one Message

How to add below flow in the middle of existing Spring Integration flow:

Query from emp table. It returns for e.g. 3 rows. Insert these 3 rows into dept table and then continue the flow as it was running before.

Splitter is configurred manually to implement it. However, flow is not proceeding after <int:aggregator /> .

emp-context.xml

...

<int:splitter apply-sequence="false" /> <!-- parent splitter -->

...

<jdbc:outbound-gateway
    query="SELECT * FROM emp"
    row-mapper="empMapper" data-source="myDataSource"
    max-rows-per-poll="100000" />

<int:service-activator ref="myService" method="process" />

<!--     <int:splitter /> child splitter commented out -->

<jdbc:outbound-gateway
    update="INSERT INTO dept (name, dept)
        VALUES(:headers[name], :headers[dept])"
    data-source="myDataSource" />

<int:aggregator />

<!-- flow is not executed after aggregator -->

<jdbc:outbound-gateway
    update="INSERT INTO other_table (mycol)
        VALUES(:headers[mycol])"
    data-source="myDataSource" />

...

MyService.java

public List<Message<String>> process(List<EmpRecord> records, @Headers MessageHeaders headers) {

    List<Message<String>> result = new ArrayList<Message<String>>();

    UUID correlationId = UUID.randomUUID();
    int sequenceNumber = 0;
    int sequenceSize = records.size();

    for (EmpRecord record : records) {

        msg = MessageBuilder
            .withPayload("null")
            .copyHeaders(headers) // headers contains mycol key that I want to preserve to use after aggregator.
            .setHeader("name", record.getName())
            .setHeader("dept", record.getDept())
            .pushSequenceDetails(correlationId, ++sequenceNumber, sequenceSize)
            .build();

        result.add(msg);
    }

    return result;
}

EDIT: added stacktrace.

Please note that headers, class and table names mentioned in the above code are different from the actual implementation. So, I have modified stacktrace as per the above code.

06:43:35.171 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL query
06:43:35.171 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement [SELECT * FROM emp]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#0.handler' sending reply Message: [Payload=[com.model.EmpRecord@9a18a0, com.model.EmpRecord@c6e1ec, com.model.EmpRecord@11a5fd0, com.model.EmpRecord@8890da]][Headers={mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d] received message: [Payload=[com.model.EmpRecord@9a18a0, com.model.EmpRecord@c6e1ec, com.model.EmpRecord@11a5fd0, com.model.EmpRecord@8890da]][Headers={mycol=c1}]
06:43:35.171 INFO  [main][com.service.MyServiceActivator] START :: process
06:43:35.171 INFO  [main][com.service.MyServiceActivator] END :: process
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d]' sending reply Message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler received message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL update
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement [INSERT INTO dept (name, dept) VALUES(?, ?)]
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] SQL update affected 1 rows
06:43:35.187 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler' sending reply Message: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.integration.aggregator.AggregatingMessageHandler] org.springframework.integration.handler.MessageHandlerChain#2$child#4.handler received message: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler] Handling message with correlationKey [cc7b7fc3-8d7e-bf15-240f-9e893ab7f5cc]: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d]' sending reply Message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler received message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL update
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement INSERT INTO dept (name, dept) VALUES(?, ?))]
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] SQL update affected 1 rows
06:43:35.218 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler' sending reply Message: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.integration.aggregator.AggregatingMessageHandler] org.springframework.integration.handler.MessageHandlerChain#2$child#4.handler received message: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler] Handling message with correlationKey [fc91e0bc-87a3-251f-87fd-fc3901285cde]: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]

Upvotes: 0

Views: 1495

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

I see the root of cause!

Since you return from MyServiceActivator the List<Message<String>> the AbstractReplyProducingMessageHandler determines it as an Iterable<Message<?>> and sends them to the output-channel one by one. So the <splitter> doesn't see anything to split and sends each of them as is - one input message, one output message. From here the <aggregator> doesn't see anything to collect. That's why it produces several messages for each of input - just because they are in the different groups.

To fix your issue you should get rid of <splitter> and populate sequenceDetails manually:

public List<Message<String>> process(List<EmpRecord> records, @Headers MessageHeaders headers) {

    List<Message<String>> result = new ArrayList<Message<String>>();

    UUID correlationId = UUID.randomUUID();
    int sequenceNumber = 0;
    int sequenceSize = records.size();

    for (EmpRecord record : records) {

        msg = MessageBuilder
            .withPayload("null")
            .copyHeaders(headers) // headers contains mycol key that I want to preserve to use after aggregator.
            .setHeader("name", record.getName())
            .setHeader("dept", record.getDept())

            .setHeader("correlationId", correlationId)
            .setHeader("sequenceNumber", ++sequenceNumber)
            .setHeader("sequenceSize", sequenceSize)
            .build();

        result.add(msg);
    }

    return result;
}

Upvotes: 1

Related Questions