Reputation: 2472
I have a question regarding how best to pursue a springbatch chunking implementation for my needs. Currently, I have a working job where I read off a collection from a database. This collection essentially maps data grouping to retrieval information. Sort of like:
GROUPING RETRIEVAL INSTRUCTIONS
GRP-01 <instructions for group 01>
GRP-02 <instructions for group 02>
..
..
GRP-N <instructions for group N>
Currently I have something similar to below (some details left out for clarity);
public class BatchConfig {
.
.
@Bean
public ItemReader<CollectionDto> databaseCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<CollectionDto>()
.name("cursorItemReader")
.dataSource(dataSource)
.sql(GET_DATA)
.rowMapper(new BeanPropertyRowMapper<>(CollectionDto.class))
.build();
}
@Bean
ItemProcessor<CollectionDto, CollectionDto> databaseXmlItemProcessor() {
return new QueryLoggingProcessor();
}
@Bean
public ItemWriter<CollectionDto> databaseCursorItemWriter() {
return new GroupingWriter();
}
.
.
}
public class GroupingWriter implements ItemWriter<CollectionDto> {
@Override
public void write(List<? CollectionDto> list) {
for(CollectionDto group : list) {
<processing here one group at a time>
}
The problem I am encountering is the nature of the systems I am interfacing with makes this too slow. So I would like to split the work (currently performed in the GroupingWriter above) probably by chunking, so that I could process each group in parallel. I was trying to figure out a way to do this group process using chunking in a separate step, but I cannot figure out how to assign each item in the collection to a separate chunk. I would be grateful for any ideas. Thanks.
Upvotes: 0
Views: 815
Reputation: 539
Here is my Spring Batch configuration for chunk-oriented processing:
@Bean
public Job myJob() {
return jobBuilders.get("myJob")
.start(chunkStep())
.build();
}
@Bean
public Step chunkStep() {
return stepBuilderFactory.get("my_step")
.<InputData, OutputData>chunk(20).faultTolerant()
.reader(databaseCursorItemReader())
.processor(processor())
.writer(databaseCursorItemWriter())
.build();
}
In chunk-oriented processing each individual item is read in from an ItemReader, handed to an ItemProcessor, and aggregated. Once the number of items read equals the commit interval (i.e. 20 in above example), the entire chunk is written out via the ItemWriter, and then the transaction is committed.
Upvotes: 1