Reputation: 165
I am struggling to understand the documentation around the use of the Aggregator.
I am using a Poller to take files from a filesystem and then parsing the XML files using a custom class. So far so good. What I want to do is use an Aggregator to collect the parsed xml files from a single poll together to process them as a unit.
I think I need to inject a correlation id into each message when polled using Poller advice and in the example below I have hard coded a correlation id into the message during the polling.
I've not managed to activate the aggregator, please could I get some pointers.
@Configuration
public class SMBConfig {
/**
* SMB Session Factory maintains persistent connection to SMB Share
*
* @return
*/
@Bean
public SessionFactory<jcifs.smb.SmbFile> cachingSessionFactory() {
// Context required in order to use anon (guest) credentials
CIFSContext context = SingletonContext.getInstance().withAnonymousCredentials();
SmbSessionFactory smbSessionFactory = new SmbSessionFactory(context);
smbSessionFactory.setHost(host);
smbSessionFactory.setPort(port);
smbSessionFactory.setDomain(domain);
smbSessionFactory.setShareAndDir(share);
smbSessionFactory.setSmbMinVersion(DialectVersion.SMB210);
smbSessionFactory.setSmbMaxVersion(DialectVersion.SMB311);
return new CachingSessionFactory<>(smbSessionFactory);
}
@Bean
public SmbRemoteFileTemplate template(SessionFactory<jcifs.smb.SmbFile> cachingSessionFactory) {
return new SmbRemoteFileTemplate(cachingSessionFactory);
}
/**
* Transforms a Stream into a parsed and typed Event
*/
@Bean
@Transformer(inputChannel = "stream", outputChannel = "xml")
public org.springframework.integration.transformer.Transformer transformer(XmlMapper xmlMapper) {
return new XmlStreamTransformer("UTF-8", xmlMapper);
}
@Bean
@InboundChannelAdapter(channel = "stream", poller = @Poller(value = "pollerMetadata"))
public MessageSource<InputStream> smbMessageSource(CompositeFileListFilter<SmbFile> filter, SmbRemoteFileTemplate template) {
SmbStreamingMessageSource messageSource = new SmbStreamingMessageSource(template);
messageSource.setRemoteDirectory(dir);
messageSource.setMaxFetchSize(2);
return messageSource;
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(30000)
.maxMessagesPerPoll(10000) // from file system on each polling cycle
.advice(new ReceiveMessageAdvice() {
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result == null ? null : MessageBuilder.fromMessage(result).setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "123").build();
}
})
.getObject();
}
}
@MessageEndpoint
@Slf4j
class EventAggregator {
@Aggregator(inputChannel = "xml")
public Long aggregatingMethod(List<EventWrapper> events) {
log.info("SAggregator");
return 1L;
}
}
Upvotes: 0
Views: 51
Reputation: 121552
Just IntegrationMessageHeaderAccessor.CORRELATION_ID
is not enough. That is only for groupping message. There is not enough info in your config at what point to release the group.
See @ReleaseStrategy
to be added into your EventAggregator
.
See also docs for more info: https://docs.spring.io/spring-integration/reference/aggregator.html#releasestrategy.
not clear, though, from your expectation on the polling result, what exactly that strategy could be since you are not always going to receive exactly maxMessagesPerPoll(10000)
.
It might be some groupTimeout
to schedule when not enough messages in group. But that is not possible with simple annotation configuration.
See here: https://docs.spring.io/spring-integration/reference/aggregator.html#agg-and-group-to.
You may also send a fake message in the afterReceive()
as a marker to release the group when result
is null. But to avoid infinite loop from that afterReceive()
you might need to support state in your ReceiveMessageAdvice
to be reset in the beforeReceive()
.
Upvotes: 0