夢のの夢
夢のの夢

Reputation: 5856

Spring Integration (SFTP) message source isn't getting more than 1 file per poll despite setting to unlimited

I have following code to read xml files from a sftp server as InputStream:

@Configuration
public class SftpConfig {
    ...

    @Bean
    @InboundChannelAdapter(channel = "stream", poller = @Poller(fixedDelay="60000"))
    public MessageSource<InputStream> messageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory(sftpProperties.getBaseDir());
        messageSource.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        // messageSource.setMaxFetchSize(-1); no matter what i set this to, it only fetches one file
        return messageSource;
    }

    @ServiceActivator(inputChannel = "stream", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return message -> {
            Assert.isTrue(message.getPayload() instanceof InputStream, "Payload must be of type $InputStream");
            String filename = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
            InputStream is = (InputStream) message.getPayload();
            log.info("I am here"); // each poll only prints this once
        };
    }
    ...
}

When I debugged or checked the logs for MessageHanlder$handleMessage, I continuously only saw one message (file object) came through. And there are more than one .xml file sitting on the sftp server as I could verify by seeing file coming through in the next poll. The documentation says

    /**
     * Set the maximum number of objects the source should fetch if it is necessary to
     * fetch objects. Setting the
     * maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
     * @param maxFetchSize the max fetch size; a negative value means unlimited.
     */
    void setMaxFetchSize(int maxFetchSize);

So that I fiddled with different numbers but to no avail. What am I missing here?

Upvotes: 0

Views: 1013

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

Sorry for misleading, but fetch doesn't mean poll. The fetch options just take as many remote entities to the local cache on a first poll and every single subsequent polls just take entries from that cache until it is exhausted.

The option about max messages per poll belongs to that @Poller configuration. See a respective option:

/**
 * @return The maximum number of messages to receive for each poll.
 * Can be specified as 'property placeholder', e.g. {@code ${poller.maxMessagesPerPoll}}.
 * Defaults to -1 (infinity) for polling consumers and 1 for polling inbound channel adapters.
 */
String maxMessagesPerPoll() default "";

Pay attention to that 1 for polling inbound channel adapters. That's how you see only one message coming through.

Nevertheless the logic is like push only one message to the channel. There is no batching for how many files you have a the moment. Independently of fetch perPoll only one message is sent to the channel. Although I agree that with infinite perPoll all the messages are sent in the same thread and during the same poll cycle.

Upvotes: 3

Related Questions