abdur rehman
abdur rehman

Reputation: 55

Run Spring Integration flow concurrently for each Ftp file

I have a Integration flow configured using Java DSL which pulls file from Ftp server using Ftp.inboundChannelAdapter then transforms it to JobRequest, then I have a .handle() method which triggers my batch job, everything is working as per required but the process in running sequentially for each file inside the FTP folder

I added currentThreadName in my Transformer Endpoint it was printing same thread name for each file

Here is what I have tried till now

1.task executor bean

 @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("Integration");

    }

2.Integration flow

  @Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
    return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
                    .remoteDirectory("/bar")
                    .localDirectory(localDir.getFile())
            ,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
            .transform(fileMessageToJobRequest(importUserJob(step1())))
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
            .route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
            .get();
}

3.I also read in another SO thread that I need ExecutorChannel so I configured one but I don't know how to inject this channel into my Ftp.inboundAdapter, from logs is see that the channel is always integrationFlow.channel#0 which I guess is a DirectChannel

 @Bean
public MessageChannel inputChannel() {
    return new ExecutorChannel(taskExecutor());
}

I dont know what I'm missing here, or I might have not properly understood Spring Messaging System as I'm very much new to Spring and Spring-Integration

Any help is appreciated

Thanks

Upvotes: 1

Views: 489

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

The ExecutorChannel you can simply inject into the flow and it is going to be applied to the SourcePollingChannelAdapter by the framework. So, having that inputChannel defined as a bean you just do this:

.channel(inputChannel())

before your .transform(fileMessageToJobRequest(importUserJob(step1()))). See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

On the other hand to process your files in parallel according your .taskExecutor(taskExecutor()) configuration, you just need to have a .maxMessagesPerPoll(20) as 1. The logic in the AbstractPollingEndpoint is like this:

this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (pollForMessage() == null) {
                        break;
                    }
                    count++;
                }

So, we do have tasks in parallel, but only when they reach that maxMessagesPerPoll where it is 20 in your current case. There is also some explanation in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

The maxMessagesPerPoll property specifies the maximum number of messages to receive within a given poll operation. This means that the poller continues calling receive() without waiting, until either null is returned or the maximum value is reached. For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll setting of 25, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds. It grabs 25, waits ten seconds, grabs the next 25, and so on.

Upvotes: 0

Related Questions