Reputation: 725
I'm using Spring Batch with a AsyncItemProcessor and things are behaving unexpectedly. Let me show first the code:
Followed a simple example as shown on the Spring Batch project:
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}
-- EDIT
If I just sleep the main process go give a few seconds to slf4j to write the flush the logs, everything works as expected.
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);
Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}
}
-- ENDOF EDIT
I'm reading a text file with a field and then using a AsyncItemProcessor to get a multithreaded processing, which consists of a Http GET on a URL to fetch some data, I'm also using a NoOpWriter to do nothing on the write part. I'm saving the results of the GET on the Processor part of the job (using log.trace / log.warn).
@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}
As for the Job:
@Configuration
public class BatchJobConfigurer {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("${async.tps:10}")
private Integer tps;
@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;
@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}
@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}
@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}
/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}
@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}
-- UPDATED WITH AsyncItemWriter (Working version)
/*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
-- Any thoughts on why the AsyncItemProcessor don't wait for all the children to to complete before send a OK-Completed signal to the context?
Upvotes: 2
Views: 3104
Reputation: 21463
The issue is that the AsyncItemProcessor
is creating Future
s that no one is waiting for. Wrap your NoOpItemWriter
in the AsyncItemWriter
so that someone is waiting for the Future
s. That will cause the job to complete as expected.
Upvotes: 6