Reputation: 5856
I have following code to read xml files from a sftp server as InputStream:
@Configuration
public class SftpConfig {
...
@Bean
@InboundChannelAdapter(channel = "stream", poller = @Poller(fixedDelay="60000"))
public MessageSource<InputStream> messageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory(sftpProperties.getBaseDir());
messageSource.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
// messageSource.setMaxFetchSize(-1); no matter what i set this to, it only fetches one file
return messageSource;
}
@ServiceActivator(inputChannel = "stream", adviceChain = "after")
@Bean
public MessageHandler handle() {
return message -> {
Assert.isTrue(message.getPayload() instanceof InputStream, "Payload must be of type $InputStream");
String filename = (String) message.getHeaders().get(FileHeaders.REMOTE_FILE);
InputStream is = (InputStream) message.getPayload();
log.info("I am here"); // each poll only prints this once
};
}
...
}
When I debugged or checked the logs for MessageHanlder$handleMessage
, I continuously only saw one message (file object) came through. And there are more than one .xml
file sitting on the sftp server as I could verify by seeing file coming through in the next poll. The documentation says
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
void setMaxFetchSize(int maxFetchSize);
So that I fiddled with different numbers but to no avail. What am I missing here?
Upvotes: 0
Views: 1013
Reputation: 121560
Sorry for misleading, but fetch
doesn't mean poll
. The fetch options just take as many remote entities to the local cache on a first poll and every single subsequent polls just take entries from that cache until it is exhausted.
The option about max messages per poll belongs to that @Poller
configuration. See a respective option:
/**
* @return The maximum number of messages to receive for each poll.
* Can be specified as 'property placeholder', e.g. {@code ${poller.maxMessagesPerPoll}}.
* Defaults to -1 (infinity) for polling consumers and 1 for polling inbound channel adapters.
*/
String maxMessagesPerPoll() default "";
Pay attention to that 1 for polling inbound channel adapters
. That's how you see only one message coming through.
Nevertheless the logic is like push only one message to the channel. There is no batching for how many files you have a the moment. Independently of fetch
perPoll
only one message is sent to the channel. Although I agree that with infinite perPoll
all the messages are sent in the same thread and during the same poll cycle.
Upvotes: 3