Ilya Sereb
Ilya Sereb

Reputation: 2561

Why Spring Integration Channel can't properly sort my messages?

I have this easy pipeline configuration. Just trying to tinker around with Spring Integration. However, the output is weird.

This is my code:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FileToConsoleIntegration {

    @Bean public IntegrationFlow fileToConsoleIntegrationFlow() {
        return IntegrationFlows.from(sameSourceDirectory(), spec -> spec.poller(this::getPollerSpec))
                .log(LoggingHandler.Level.WARN, "before.sort", m -> m.getHeaders().get("file_name"))
                .channel(MessageChannels.queue(5))
                .log(LoggingHandler.Level.INFO, "after.sort", m -> m.getHeaders().get("file_name"))
                .channel(alphabeticallyReversed())
                .log(LoggingHandler.Level.ERROR, "after.sort", m -> m.getHeaders().get("file_name"))
                .handle(logInfoMessageHandler())
                .get();
    }

    private PollerSpec getPollerSpec(PollerFactory p) {
        return p
                .fixedRate(1000)
                .maxMessagesPerPoll(10);
    }

    @Bean public MessageSource<File> sameSourceDirectory() {
        FileReadingMessageSource messageSource = new FileReadingMessageSource();
        messageSource.setDirectory(new File("input_dir"));
        return messageSource;
    }

    @Bean public MessageHandler logInfoMessageHandler() {
        return message ->
                log.info("Handling message with headers: {} and payload: {}",
                        message.getHeaders(), message.getPayload());
    }

    @Bean
    public PriorityChannel alphabeticallyReversed() {
        return MessageChannels.priority()
                .capacity(5)
                .comparator(Comparator.comparing(getFilename(), Comparator.reverseOrder()))
                .get();
    }

    private Function<Message<?>, String> getFilename() {
        return a -> (String) a.getHeaders().get("file_name");
    }

}

This is my input:

/input_dir
    01_zhasdfha.txt
    02_usfhahjf.txt
    05_bsdfasdf.txt
    06_asdfasdf.txt

This is my output:

2019-10-10 17:24:48.604  WARN 46024 --- [ask-scheduler-1] before.sort                              : 01_zhasdfha.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-1] after.sort                               : 01_zhasdfha.txt
2019-10-10 17:24:48.605 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 01_zhasdfha.txt
2019-10-10 17:24:48.605  WARN 46024 --- [ask-scheduler-1] before.sort                              : 02_usfhahjf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-1] after.sort                               : 02_usfhahjf.txt
2019-10-10 17:24:48.606  WARN 46024 --- [ask-scheduler-1] before.sort                              : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 02_usfhahjf.txt
2019-10-10 17:24:48.606  INFO 46024 --- [ask-scheduler-1] after.sort                               : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 05_bsdfasdf.txt
2019-10-10 17:24:48.606  WARN 46024 --- [ask-scheduler-1] before.sort                              : 06_asdfasdf.txt
2019-10-10 17:24:48.606  INFO 46024 --- [ask-scheduler-1] after.sort                               : 06_asdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort                               : 06_asdfasdf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.609  INFO 46024 --- [           main] c.s.i.siexample.SiExampleApplication     : Started SiExampleApplication in 0.743 seconds (JVM running for 1.232)

Desired output:

2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.605  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt

The difference between the desired output and current output is that the order is messed. How can I fix that?

Upvotes: 0

Views: 238

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Just because you have set the queue sizes to 5, doesn't mean the framework will wait for all 5 before the downstream poller pulls them. If there is a poller waiting for a message (the default is to wait up to 1 second on each poll), it will be received immediately. You could try setting the poller's receive timeout to 0 but there will still be a (small) race condition where messages can be fetched earlier than you expected.

It would be better to use an aggregator followed by a transformer to sort the List<> (or a custom output processor on the aggregator) and then a splitter.

Upvotes: 1

Related Questions