Reputation: 31
We have a Spring Integration application and are getting out of memory errors during high throughput (a production batch process). The application is utilizing Spring-integration-core release 4.3.10. We are using org.springframework.integration.annotation* in our application and on our Aggregator object (@MessageEndpoint/@Autowired/@Aggregator) and have a custom a 'aggregate' method.
In heap dumps, I am seeing SimpleMessageGroup (and other associated objects) from high throughput runs and also when individual SoapUI requests are run. Please see the attached screen shots.
I have read there are a number of variables that can be set in the 'AggregatingMessageHandler' object. In order to gain access to these variables, while still utilizing 'spring.integration.annotation' and with a custom 'aggregate' method', I changed my annotated Aggregator object extend 'AggregatingMessageHandler'.
@MessageEndpoint
public class MyAggregator extends AggregatingMessageHandler {
private final MyLogger myTransactionLogger;
@Autowired
public CIPAggregator( MyLogger myTransactionLogger) {
super(new DefaultAggregatingMessageGroupProcessor(), new SimpleMessageStore());
this.setSendPartialResultOnExpiry(false);
this.setExpireGroupsUponCompletion(true);
//this.setMinimumTimeoutForEmptyGroups(500);
this.setGroupTimeoutExpression(new ValueExpression<>(1000L));
this.setExpireGroupsUponTimeout(true);
this.cipTransactionLogger = cipTransactionLogger;
}
@Aggregator(inputChannel = "serviceResponseChannel", outputChannel="aggregatorResponseChannel")
public <T> IndividualInquiryResponse aggregate(List<Message> serviceResponses) {...custom code }
I have tried different settings and combinations of the above variables, but I continue to see the same behavior.
Any assistance would be greatly appreciated.
Update June 6, 2018 Per instruction from Artem Bilan Successfully configured Aggregator by utilizing @Bean\@ServiceActivator annotation with AggregatorFactoryBean. (MyAggregator object does not contain any annotations)
@Bean
@ServiceActivator(inputChannel = "serviceResponseChannel")
FactoryBean<MessageHandler> aggregatorFactoryBean( ) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(new MyAggregator(myTransactionLogger()));
aggregatorFactoryBean.setMethodName("aggregate");
aggregatorFactoryBean.setMessageStore(new SimpleMessageStore());
aggregatorFactoryBean.setOutputChannel(aggregatorResponseChannel());
aggregatorFactoryBean.setExpireGroupsUponTimeout(true);
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(1900L));// @MessagingGateway defaultReplyTimeout=2000
aggregatorFactoryBean.setSendPartialResultOnExpiry(false);
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
return aggregatorFactoryBean;
}
Receiving responses as expected, but still have memory leak issues with SimpleMessageGroup and associated objects. One SimpleMessageGroup per request.
Upvotes: 1
Views: 1089
Reputation: 31
If you are using a custom aggregate method within an Aggregator object, define an AggregatorFactoryBean similar to the following. Your Aggregator object should not contain an annotations
@Bean
@ServiceActivator(inputChannel = "serviceResponseChannel")
FactoryBean<MessageHandler> aggregatorFactoryBean( ) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(new MyAggregator(myTransactionLogger()));//your Aggregator object
aggregatorFactoryBean.setMethodName("aggregate");//your aggregator method
aggregatorFactoryBean.setMessageStore(new SimpleMessageStore());
aggregatorFactoryBean.setOutputChannel(aggregatorResponseChannel());
aggregatorFactoryBean.setSendPartialResultOnExpiry(false);
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
aggregatorFactoryBean.setExpireGroupsUponTimeout(true);
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(1900L));// @MessagingGateway defaultReplyTimeout=2000
return aggregatorFactoryBean;
}
public class MyAggregator {
private final MyLogger myTransactionLogger;
public MyAggregator( MyLogger myTransactionLogger) {
this.myTransactionLogger = myTransactionLogger;
}
public <T> InquiryResponse aggregate(List<Message> serviceResponses) {
your aggregatation code.....
}
}
Upvotes: 1
Reputation: 121347
Please, share with us what is the ReleaseStrategy
for your use-case.
By default an aggregator tries to group messages by the correlationId
header.
And decides when it is time to release group by the sequenceSize
header. If this condition isn't met, the group is not released and remains in the memory.
Consider to use @ServiceActivator
instead on the AggregatorFactoryBean
@Bean
though.
Upvotes: 0