Reputation: 55
I have a Integration flow configured using Java DSL which pulls file from Ftp server using Ftp.inboundChannelAdapter
then transforms it to JobRequest
, then I have a .handle()
method which triggers my batch job, everything is working as per required but the process in running sequentially for each file inside the FTP folder
I added currentThreadName
in my Transformer Endpoint it was printing same thread name for each file
Here is what I have tried till now
1.task executor bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
2.Integration flow
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
3.I also read in another SO thread that I need ExecutorChannel
so I configured one but I don't know how to inject this channel into my Ftp.inboundAdapter
, from logs is see that the channel is always integrationFlow.channel#0
which I guess is a DirectChannel
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
I dont know what I'm missing here, or I might have not properly understood Spring Messaging System as I'm very much new to Spring and Spring-Integration
Any help is appreciated
Thanks
Upvotes: 1
Views: 489
Reputation: 121542
The ExecutorChannel
you can simply inject into the flow and it is going to be applied to the SourcePollingChannelAdapter
by the framework. So, having that inputChannel
defined as a bean you just do this:
.channel(inputChannel())
before your .transform(fileMessageToJobRequest(importUserJob(step1())))
.
See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
On the other hand to process your files in parallel according your .taskExecutor(taskExecutor())
configuration, you just need to have a .maxMessagesPerPoll(20)
as 1
. The logic in the AbstractPollingEndpoint
is like this:
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
So, we do have tasks in parallel, but only when they reach that maxMessagesPerPoll
where it is 20
in your current case. There is also some explanation in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer
The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.
Upvotes: 0