Danny Petrunov
Danny Petrunov

Reputation: 101

Spring Integration poll aggregator programmatically

I've been using Spring boot and getting rid of all XML files in my project. Unfortunately it also uses Spring integration which from my experience is very heavily XML based.

I have a scenario which requires me to have an aggregator, and have that aggregator polled every x amount of seconds.

This can be done using XML like so (example taken from a previous SO question):

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
    send-partial-result-on-expiry="true"
    group-timeout="60000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 100">
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>

I've managed to find a class that kinda sorta does the trick and it's bean definition is:

@Bean(name = "aggregatingMessageHandler")
public AggregatingMessageHandler aggregatingMessageHandler() {

    AggregatingMessageHandler aggregatingMessageHandler =
            new AggregatingMessageHandler(messageGroupProcessorBean(),
                    new SimpleMessageStore(10));

 aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean());

    aggregatingMessageHandler.setReleaseStrategy(
            new TimeoutCountSequenceSizeReleaseStrategy(3,
                    TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT));

    aggregatingMessageHandler.setExpireGroupsUponCompletion(true);

    aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean());

    return aggregatingMessageHandler;
}

However this triggers the canRelease() method of the ReleaseStrategy only when a new message is received in the inboundChannel associated with this handler, and not at a fixed time interval which is not the desired result. I want all groups older than one minute to be redirected to the output channel. My question is - is there a way to programmatically attach a Poller such as the one in the XML definition?

Upvotes: 0

Views: 1829

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121262

For Java & Annotation configuration take a look here and here.

The Aggregator component has AggregatorFactoryBean for easier Java Configuration.

Anyway you have to pay attention that there is a @ServiceActivator annotation together with a @Bean on that handler definition. And exactly @ServiceActivator has poller attribute.

Also pay attention that there is a Java DSL for Spring Integration.

Another part of your question is a bit confusion. The poller fully isn't related to the release strategy. Its responsibility in this case to receive messages from the PollableChannel which is that logEntryChannel. And only after that already polled messages are placed to the aggregator for it correlation and release logic.

What is done in that sample is fully different story and we can discuss it in the separate SO thread.

Upvotes: 1

Related Questions