Reputation: 6606
I have a file system directory that I want to poll for files and then process each file concurrently, with a single thread per file. My impression was that, under the cover, InboundFileAdapter
puts each file into a queue, so I could then use an executor channel downstream to have later calls be processed concurrently. I implemented that in Java Config as follows:
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
.scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
.filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
e -> e.poller(Pollers
.fixedDelay(fileSystemPollDelay)
.get()))
.channel(MessageChannels.executor(executor).get())
.transform(File::toPath)
.enrichHeaders(cleanUpConfigurer)
.get()
Every channel downstream of the executor channel is itself a direct channel.
However, I see see poor concurrency in the downstream services. With a cached thread pool, I see the same thread executing the downstream code essentially serially, while if I use a fixed pool executor, I see different threads trading off for still serial execution.
I've also tried putting a bridge between the poller and the executor channel, but to no avail.
Upvotes: 2
Views: 542
Reputation: 121272
That's just because of SourcePollingChannelAdapterFactoryBean
under hood:
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
this.pollerMetadata.setMaxMessagesPerPoll(1);
}
So, each your .fixedDelay(fileSystemPollDelay)
only one File
is polled from the queue to process.
So, just increase .maxMessagesPerPoll()
to some appropriate for your system value and have fun with the concurrency!
BTW, there is no reason to introduce ExecutorChannel
just after the polling adapter. You simply can use .taskExecutor()
for the .poller()
exactly for the same concurrency reason.
Upvotes: 1