Alex
Alex

Reputation: 17

Spring Integration resequencer does not release the last group of messages

I have the following configuration:

    @Bean
    public IntegrationFlow messageFlow(JdbcMessageStore groupMessageStore, TransactionSynchronizationFactory syncFactory, TaskExecutor te, ThreadPoolTaskScheduler ts, RealTimeProcessor processor) {
        return IntegrationFlows
                .from("inputChannel")
                .handle(processor, "handleInputMessage", consumer -> consumer
                        .taskScheduler(ts)
                        .poller(poller -> poller
                                .fixedDelay(pollerFixedDelay)
                                .receiveTimeout(pollerReceiveTimeout)
                                .maxMessagesPerPoll(pollerMaxMessagesPerPoll)
                                .taskExecutor(te)
                                .transactional()
                                .transactionSynchronizationFactory(syncFactory)))
                .resequence(s -> s.messageStore(groupMessageStore)
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 30000)))
                .channel("sendingChannel")
                .handle(processor, "sendMessage")
                .get();
    }

If I send a single batch of e.g. 100 messages to the inputChannel it works as expected until there are no messages in the inputChannel. After the inputChannel becomes empty it also stops processing for messages that were waiting for sequencing. As a result there are always a couple of messages left in the groupMessageStore even after the set release timeout.

I'm guessing it's because the poller is configured only for the inputChannel and if there are no messages in there it will never get to the sequencer (so will never call canRelease on the release strategy). But if I try adding a separate poller for the resequencer I get the following error A poller should not be specified for endpoint since channel x is a SubscribableChannel (not pollable).

Is there a different way to configure it so that the last group of messages is always released?

Upvotes: 0

Views: 211

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

The release strategy is passive and needs something to trigger it to be called.

Add .groupTimeout(...) to release the partial sequence after the specified time elapses.

EDIT

@SpringBootApplication
public class So67993972Application {

    private static final Logger log = LoggerFactory.getLogger(So67993972Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So67993972Application.class, args);
    }

    @Bean
    IntegrationFlow flow(MessageGroupStore mgs) {
        return IntegrationFlows.from(MessageChannels.direct("input"))
                .resequence(e -> e.messageStore(mgs)
                                    .groupTimeout(5_000)
                                    .sendPartialResultOnExpiry(true)
                                    .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 2000)))
                .channel(MessageChannels.queue("output"))
                .get();
    }

    @Bean
    MessageGroupStore mgs() {
        return new SimpleMessageStore();
    }

    @Bean
    public ApplicationRunner runner(MessageChannel input, QueueChannel output, MessageGroupStore mgs) {
        return args -> {
            MessagingTemplate template = new MessagingTemplate(input);
            log.info("Sending");
            template.send(MessageBuilder.withPayload("foo")
                    .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "bar")
                    .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                    .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                    .build());
            log.info(output.receive(10_000).toString());
            Thread.sleep(1000);
            log.info(mgs.getMessagesForGroup("bar").toString());
        };
    }

}

Upvotes: 1

Related Questions