Reputation: 1245
How to add below flow in the middle of existing Spring Integration flow:
Query from emp table. It returns for e.g. 3 rows. Insert these 3 rows into dept table and then continue the flow as it was running before.
Splitter is configurred manually to implement it. However, flow is not proceeding after <int:aggregator />
.
emp-context.xml
...
<int:splitter apply-sequence="false" /> <!-- parent splitter -->
...
<jdbc:outbound-gateway
query="SELECT * FROM emp"
row-mapper="empMapper" data-source="myDataSource"
max-rows-per-poll="100000" />
<int:service-activator ref="myService" method="process" />
<!-- <int:splitter /> child splitter commented out -->
<jdbc:outbound-gateway
update="INSERT INTO dept (name, dept)
VALUES(:headers[name], :headers[dept])"
data-source="myDataSource" />
<int:aggregator />
<!-- flow is not executed after aggregator -->
<jdbc:outbound-gateway
update="INSERT INTO other_table (mycol)
VALUES(:headers[mycol])"
data-source="myDataSource" />
...
MyService.java
public List<Message<String>> process(List<EmpRecord> records, @Headers MessageHeaders headers) {
List<Message<String>> result = new ArrayList<Message<String>>();
UUID correlationId = UUID.randomUUID();
int sequenceNumber = 0;
int sequenceSize = records.size();
for (EmpRecord record : records) {
msg = MessageBuilder
.withPayload("null")
.copyHeaders(headers) // headers contains mycol key that I want to preserve to use after aggregator.
.setHeader("name", record.getName())
.setHeader("dept", record.getDept())
.pushSequenceDetails(correlationId, ++sequenceNumber, sequenceSize)
.build();
result.add(msg);
}
return result;
}
EDIT: added stacktrace.
Please note that headers, class and table names mentioned in the above code are different from the actual implementation. So, I have modified stacktrace as per the above code.
06:43:35.171 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL query
06:43:35.171 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement [SELECT * FROM emp]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#0.handler' sending reply Message: [Payload=[com.model.EmpRecord@9a18a0, com.model.EmpRecord@c6e1ec, com.model.EmpRecord@11a5fd0, com.model.EmpRecord@8890da]][Headers={mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d] received message: [Payload=[com.model.EmpRecord@9a18a0, com.model.EmpRecord@c6e1ec, com.model.EmpRecord@11a5fd0, com.model.EmpRecord@8890da]][Headers={mycol=c1}]
06:43:35.171 INFO [main][com.service.MyServiceActivator] START :: process
06:43:35.171 INFO [main][com.service.MyServiceActivator] END :: process
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d]' sending reply Message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler received message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL update
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement [INSERT INTO dept (name, dept) VALUES(?, ?)]
06:43:35.187 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] SQL update affected 1 rows
06:43:35.187 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler' sending reply Message: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.integration.aggregator.AggregatingMessageHandler] org.springframework.integration.handler.MessageHandlerChain#2$child#4.handler received message: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.187 DEBUG [main][org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler] Handling message with correlationKey [cc7b7fc3-8d7e-bf15-240f-9e893ab7f5cc]: [Payload={UPDATED=1}][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.handler.ServiceActivatingHandler] handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1de007d]' sending reply Message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.171 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler received message: [Payload=null][Headers={name=john, dept=BE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL update
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] Executing prepared SQL statement INSERT INTO dept (name, dept) VALUES(?, ?))]
06:43:35.218 DEBUG [main][org.springframework.jdbc.core.JdbcTemplate] SQL update affected 1 rows
06:43:35.218 DEBUG [main][org.springframework.integration.jdbc.JdbcOutboundGateway] handler 'org.springframework.integration.handler.MessageHandlerChain#2$child#3.handler' sending reply Message: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.integration.aggregator.AggregatingMessageHandler] org.springframework.integration.handler.MessageHandlerChain#2$child#4.handler received message: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]
06:43:35.218 DEBUG [main][org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler] Handling message with correlationKey [fc91e0bc-87a3-251f-87fd-fc3901285cde]: [Payload={UPDATED=1}][Headers={name=ravi, dept=SE, mycol=c1}]
Upvotes: 0
Views: 1495
Reputation: 121552
I see the root of cause!
Since you return from MyServiceActivator
the List<Message<String>>
the AbstractReplyProducingMessageHandler
determines it as an Iterable<Message<?>>
and sends them to the output-channel
one by one. So the <splitter>
doesn't see anything to split and sends each of them as is - one input message, one output message. From here the <aggregator>
doesn't see anything to collect
. That's why it produces several messages for each of input - just because they are in the different groups
.
To fix your issue you should get rid of <splitter>
and populate sequenceDetails
manually:
public List<Message<String>> process(List<EmpRecord> records, @Headers MessageHeaders headers) {
List<Message<String>> result = new ArrayList<Message<String>>();
UUID correlationId = UUID.randomUUID();
int sequenceNumber = 0;
int sequenceSize = records.size();
for (EmpRecord record : records) {
msg = MessageBuilder
.withPayload("null")
.copyHeaders(headers) // headers contains mycol key that I want to preserve to use after aggregator.
.setHeader("name", record.getName())
.setHeader("dept", record.getDept())
.setHeader("correlationId", correlationId)
.setHeader("sequenceNumber", ++sequenceNumber)
.setHeader("sequenceSize", sequenceSize)
.build();
result.add(msg);
}
return result;
}
Upvotes: 1