Reputation: 129
I am quite new to Spring Batch and tried to run Spring batch with single thread. Now I need to add multithreading in step and have below configuration, but parallel processing is getting hang after some time and no trace on console after it processes some records. Earlier for single thread I used JdbcCursorItemReader and then switch to JdbcPagingItemReader for thread safe reader. Reader is reading entries from postgres DB and then processor (which calls other rest webservice and return response to writer) and writer (which creates new file and update status data in DB) can execute parallelly.
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
ItemReader<OrderRequest> itemReader,
ItemProcessor<OrderRequest, OrderResponse> dataProcessor,
ItemWriter<OrderResponse> fileWriter, JobExecutionListener jobListener,
ItemReadListener<OrderRequest> stepItemReadListener,
SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {
Step step1 = stepBuilderFactory.get("Process-Data")
.<OrderRequest, OrderResponse>chunk(10)
.listener(stepItemReadListener)
.reader(itemReader)
.processor(dataProcessor)
.writer(fileWriter)
.faultTolerant()
.processorNonTransactional()
.skipLimit(5)
.skip(CustomException.class)
.listener(stepSkipListener)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
return jobBuilderFactory.get("Batch-Job")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(step1)
.build();
}
@StepScope
@Bean
public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(rowMapper);
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("OrderRequestID", Order.ASCENDING);
// Postgres implementation of a PagingQueryProvider using database specific features.
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FROM OrderRequest");
queryProvider.setWhereClause("CUSTOMER = '" + customerId + "'");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@StepScope
@Bean
public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(5);
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@StepScope
@Bean
ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
return new BatchDataFileProcessor();
}
@StepScope
@Bean
ItemWriter<OrderResponse> fileWriter() {
return new BatchOrderFileWriter();
}
@StepScope
@Bean
public ItemReadListener<OrderRequest> stepItemReadListener() {
return new StepItemReadListener();
}
@Bean
public JobExecutionListener jobListener() {
return new JobListener();
}
@StepScope
@Bean
public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
return new StepSkipListener();
}
What is problem with multithreading configuration here? Batch works fine with single record at a time when used JdbcCursorItemReader and no TaskExecutor bean:
@StepScope
@Bean
public JdbcCursorItemReader<OrderRequest> jdbcCursorItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
return new JdbcCursorItemReaderBuilder<OrderRequest>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.queryArguments(customerId)
.sql(CommonConstant.FETCH_QUERY)
.rowMapper(rowMapper)
.saveState(true)
.build();
}
Upvotes: 2
Views: 1338
Reputation: 129
After changing TaskExecutor as follows its working now:
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(concurrencyLimit);
return taskExecutor;
}
Didn't get what was the problem with earlier.
Upvotes: 0