Reputation: 342
I have a large file which may contain 100K to 500K records. I am planning to use chunk oriented processing and my thought is
1) Split the large file into smaller based on the count let say 10K in each file.
2) If there are 100K records then I will get 10 files each containing 10K reocrds
3) I would like to partition these 10 files and would like to process using 5 threads. I am thinking to use custom MultiResourcePartioner
4) The 5 threads should process all the 10 files created in split process.
5) I don't want to create same number of threads equal to file count as in that case I may face memory issues. What I am looking is whatever the number of files I would like to process them using only 5 threads (I can increase based on my requirements).
Expert could you let me know this can be achieved using spring batch? If yes could you please share pointers or reference implementations
Thanks in advance
The working job-config xml
<description>Spring Batch File Chunk Processing</description>
<import resource="../config/batch-context.xml" />
<batch:job id="file-partition-batch" job-repository="jobRepository" restartable="false">
<batch:step id="master">
<batch:partition partitioner="partitioner" handler="partitionHandler" />
</batch:step>
</batch:job>
<batch:step id="slave">
<batch:tasklet>
<batch:chunk reader="reader" processor="compositeProcessor"
writer="compositeWriter" commit-interval="5">
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="slave" />
<property name="gridSize" value="5" />
</bean>
<bean id="partitioner" class="com.poc.partitioner.FileMultiResourcePartitioner">
<property name="resources" value="file:/Users/anupghosh/Documents/Spring_Batch/FilePartitionBatch/*.txt" />
<property name="threadName" value="feed-processor" />
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="5" />
</bean>
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{stepExecutionContext['fileName']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="|"/>
<property name="names" value="key,docName,docTypCD,itemType,itemNum,launchDate,status" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.poc.mapper.FileRowMapper" />
</property>
</bean>
</property>
</bean>
<bean id="validatingProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<constructor-arg ref="feedRowValidator" />
</bean>
<bean id="feedProcesor" class="com.poc.processor.FeedProcessor" />
<bean id="compositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<ref bean="validatingProcessor" />
<ref bean="feedProcesor" />
</list>
</property>
</bean>
<bean id="recordDecWriter" class="com.poc.writer.RecordDecWriter" />
<bean id="reconFlatFileCustomWriter" class="com.poc.writer.ReconFileWriter">
<property name="reconFlatFileWriter" ref="reconFlatFileWriter" />
</bean>
<bean id="reconFlatFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:/Users/anupghosh/Documents/Spring_Batch/recon-#{stepExecutionContext[threadName]}.txt" />
<property name="shouldDeleteIfExists" value="true" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="validationError" />
</bean>
</property>
</bean>
</property>
</bean>
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="recordDecWriter" />
<ref bean="reconFlatFileCustomWriter" />
</list>
</property>
</bean>
<bean id="feedRowValidator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="com.poc.validator.FeedRowValidator"/>
</property>
</bean>
Upvotes: 5
Views: 13034
Reputation: 11055
was able to solve this using MultiResourcePartitioner. below are java config
@Bean
public Partitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
ClassLoader cl = this.getClass().getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");
partitioner.setResources(resources);
partitioner.partition(10);
return partitioner;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
@Qualifier("masterStep")
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(ProcessDataStep())
.partitioner("ProcessDataStep",partitioner())
.taskExecutor(taskExecutor())
.listener(pcStressStepListener)
.build();
}
@Bean
@Qualifier("processData")
public Step processData() {
return stepBuilderFactory.get("processData")
.<pojo, pojo> chunk(5000)
.reader(reader)
.processor(processor())
.writer(writer)
.build();
}
@Bean(name="reader")
@StepScope
public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) {
FlatFileItemReader<pojo> reader = new FlatFileItemReader<>();
reader.setResource(new UrlResource(filename));
reader.setLineMapper(new DefaultLineMapper<pojo>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(FILE HEADER);
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() {
{
setTargetType(pojo.class);
}
});
}
});
return reader;
}
Upvotes: 3