jdh961502
jdh961502

Reputation: 165

Spring Integration Aggregator with Poller difficulties

I am struggling to understand the documentation around the use of the Aggregator.

I am using a Poller to take files from a filesystem and then parsing the XML files using a custom class. So far so good. What I want to do is use an Aggregator to collect the parsed xml files from a single poll together to process them as a unit.

I think I need to inject a correlation id into each message when polled using Poller advice and in the example below I have hard coded a correlation id into the message during the polling.

I've not managed to activate the aggregator, please could I get some pointers.

@Configuration
public class SMBConfig {
    

    /**
     * SMB Session Factory maintains persistent connection to SMB Share
     *
     * @return
     */
    @Bean
    public SessionFactory<jcifs.smb.SmbFile> cachingSessionFactory() {
        // Context required in order to use anon (guest) credentials
        CIFSContext context = SingletonContext.getInstance().withAnonymousCredentials();
        SmbSessionFactory smbSessionFactory = new SmbSessionFactory(context);
        smbSessionFactory.setHost(host);
        smbSessionFactory.setPort(port);
        smbSessionFactory.setDomain(domain);

        smbSessionFactory.setShareAndDir(share);
      
        smbSessionFactory.setSmbMinVersion(DialectVersion.SMB210);
        smbSessionFactory.setSmbMaxVersion(DialectVersion.SMB311);
        return new CachingSessionFactory<>(smbSessionFactory);
    }


    @Bean
    public SmbRemoteFileTemplate template(SessionFactory<jcifs.smb.SmbFile> cachingSessionFactory) {
        return new SmbRemoteFileTemplate(cachingSessionFactory);
    }

    /**
     * Transforms a Stream into a parsed and typed Event
     */
    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "xml")
    public org.springframework.integration.transformer.Transformer transformer(XmlMapper xmlMapper) {
        return new XmlStreamTransformer("UTF-8", xmlMapper);
    }


    @Bean
    @InboundChannelAdapter(channel = "stream", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<InputStream> smbMessageSource(CompositeFileListFilter<SmbFile> filter, SmbRemoteFileTemplate template) {
        SmbStreamingMessageSource messageSource = new SmbStreamingMessageSource(template);
        messageSource.setRemoteDirectory(dir);
        messageSource.setMaxFetchSize(2);
        return messageSource;
    }


    @Bean
    public PollerMetadata pollerMetadata() {

        return Pollers.fixedDelay(30000)
                .maxMessagesPerPoll(10000) // from file system on each polling cycle
                .advice(new ReceiveMessageAdvice() {
                    @Override
                    public Message<?> afterReceive(Message<?> result, Object source) {
                        return result == null ? null : MessageBuilder.fromMessage(result).setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "123").build();

                    }
                })
                .getObject();

    }

}


@MessageEndpoint
@Slf4j
class EventAggregator {
    @Aggregator(inputChannel = "xml")
    public Long aggregatingMethod(List<EventWrapper> events) {

        log.info("SAggregator");
        return 1L;
    }
}

Upvotes: 0

Views: 51

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

Just IntegrationMessageHeaderAccessor.CORRELATION_ID is not enough. That is only for groupping message. There is not enough info in your config at what point to release the group.

See @ReleaseStrategy to be added into your EventAggregator. See also docs for more info: https://docs.spring.io/spring-integration/reference/aggregator.html#releasestrategy.

not clear, though, from your expectation on the polling result, what exactly that strategy could be since you are not always going to receive exactly maxMessagesPerPoll(10000).

It might be some groupTimeout to schedule when not enough messages in group. But that is not possible with simple annotation configuration.

See here: https://docs.spring.io/spring-integration/reference/aggregator.html#agg-and-group-to.

You may also send a fake message in the afterReceive() as a marker to release the group when result is null. But to avoid infinite loop from that afterReceive() you might need to support state in your ReceiveMessageAdvice to be reset in the beforeReceive().

Upvotes: 0

Related Questions