Diego Magalhães
Diego Magalhães

Reputation: 725

Spring batch with Spring Boot terminates before children process with AsyncItemProcessor

I'm using Spring Batch with a AsyncItemProcessor and things are behaving unexpectedly. Let me show first the code:

Followed a simple example as shown on the Spring Batch project:

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
    public static void main(String[] args) throws Exception {// NOSONAR
        System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        //SpringApplication.run(PerfilEletricoApp.class, args);
    }
}

-- EDIT

If I just sleep the main process go give a few seconds to slf4j to write the flush the logs, everything works as expected.

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {

    public static void main(String[] args) throws Exception {// NOSONAR
        //System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);

        Thread.sleep(1000 * 5);
        System.exit(SpringApplication.exit(context));
    }

}

-- ENDOF EDIT

I'm reading a text file with a field and then using a AsyncItemProcessor to get a multithreaded processing, which consists of a Http GET on a URL to fetch some data, I'm also using a NoOpWriter to do nothing on the write part. I'm saving the results of the GET on the Processor part of the job (using log.trace / log.warn).

@Configuration
public class HttpClientConfigurer {
    // [... property and configs omitted] 
    @Bean
    public CloseableHttpClient createHttpClient() {
      // ... creates and returns a poolable http client etc
    }
}

As for the Job:

@Configuration
public class BatchJobConfigurer {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("${async.tps:10}")
    private Integer tps;

    @Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
    private String sourceDir;

    @Bean
    public ItemReader<String> reader() {
        MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
        reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
        reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
        return reader;
    }

    @Bean
    public ItemReader<String> flatItemReader() {
        FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(new DefaultLineMapper<String>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "sample-field-001"});
            }});
            setFieldSetMapper(new SimpleStringFieldSetMapper<>());
        }});
        return itemReader;
    }


    @Bean
    public ItemProcessor asyncItemProcessor(){
        AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(processor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<String,OiPaggoResponse> processor(){
        return new PerfilEletricoItemProcessor();
    }

    /**
     * Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
     * @return a NoOpItemWriter<OiPaggoResponse>
     */
    @Bean
    public ItemWriter<OiPaggoResponse> writer() {
        return new NoOpItemWriter<>();
    }

    @Bean
    protected Step step1() throws Exception {
/*
 Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())   
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(tps);
        executor.setMaxPoolSize(tps);
        executor.setQueueCapacity(tps * 1000);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("AsyncExecutor-");
        return executor;
    }
}

-- UPDATED WITH AsyncItemWriter (Working version)

   /*Wrapped Writer*/
   @Bean
    public ItemWriter asyncItemWriter(){
        AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    /*AsyncItemWriter defined on the steps*/
    @Bean
    protected Step step1() throws Exception {
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

-- Any thoughts on why the AsyncItemProcessor don't wait for all the children to to complete before send a OK-Completed signal to the context?

Upvotes: 2

Views: 3104

Answers (1)

Michael Minella
Michael Minella

Reputation: 21463

The issue is that the AsyncItemProcessor is creating Futures that no one is waiting for. Wrap your NoOpItemWriter in the AsyncItemWriter so that someone is waiting for the Futures. That will cause the job to complete as expected.

Upvotes: 6

Related Questions