Sundar
Sundar

Reputation: 105

Spring Batch: MultiResourcePartitioner how to set resources lazily

Question: How to set/Inject resources lazily like how it is done using the XML config in Java config.

We have spring batch program that is currently using XML configuration for uploading multiple files using the MultiResourcePartitioner. This works as intended see below config.

<?xml version="1.0" encoding="UTF-8"?>
<beans  xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">

    <job id="fileLoaderJob" xmlns="http://www.springframework.org/schema/batch">

        <step id="moveFiles" next="batchFileUploader">
            <tasklet ref="moveFilesTasklet" />          
        </step>

        <step id="batchFileUploader" parent="batchFileUpload:master" >
            <next on="*" to="archiveFiles" />
        </step>

        <step id="archiveFiles" >
            <batch:tasklet ref="archiveFilesTasklet" />         
        </step>

    </job>

    <!--This Tasklet moves the file from say Input to Work dir -->
    <bean id="moveFilesTasklet" class="com.spring.batch.fileloader.MoveFilesTasklet" scope="step" />

    <step id="batchFileUpload" xmlns="http://www.springframework.org/schema/batch">
        <tasklet>
            <chunk reader="fileReader"      
                   commit-interval="10000"
                   writer="fileWriter"
            />
        </tasklet>
    </step>

    <bean name="batchFileUpload:master" class="org.springframework.batch.core.partition.support.PartitionStep">
        <property name="jobRepository" ref="jobRepository"/>
        <property name="stepExecutionSplitter">
            <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
                <constructor-arg ref="jobRepository"/>
                <constructor-arg ref="batchFileUpload"/>
                <constructor-arg>
                    <bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step">
                        <property name="resources" ref="fileResources" />
                    </bean>
                </constructor-arg>
            </bean>
        </property> 
        <property name="partitionHandler">
            <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
                <property name="taskExecutor">
                    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor">
                        <property name="concurrencyLimit" value="5" />
                    </bean>
                </property>
                <property name="step" ref="batchFileUpload"/>
            </bean>
        </property>
    </bean>

    <bean id="fileResources" class="com.spring.batch.fileloader.fileResources" />

    <bean id="fileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="#{stepExecutionContext[fileName]}" />
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="delimiter" value="," />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="...fileFieldSetMapper" />
                </property>
            </bean>
        </property>
    </bean>

    <bean id="fileWriter" class="....fileWriter" scope="step" />

    <bean id="archiveFilesTasklet" class="....ArchiveFilesTasklet" scope="step" />  

</beans>

This works well. When I try to covert this to Java Config. I am getting the resources as NULL. Here is my config class.

@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = {"com.spring.batch.fileloader"})
public class SpringBatchConfig{

    @Autowired
    private JobBuilderFactory jobBuilders;

    @Autowired
    private StepBuilderFactory stepBuilders;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private ReadPropertiesFile properties;

    @Bean
    BatchConfigurer configurer(@Qualifier("dataSource") DataSource dataSource){
        return new DefaultBatchConfigurer(dataSource);
    }

    @Bean(name = "fileLoaderJob")
    public Job csiAuditFileLoaderJob() throws Exception{
        return jobBuilders.get("csiAuditFileLoaderJob")
                .start(moveFiles())
                .next(batchFileUploader())
                .next(archiveFiles())
                .build();
    }

    @Bean
    public Step moveFiles(){
        return stepBuilders.get("moveFiles")
                .tasklet(new MoveFilesTasklet(properties))
                .build();
    }

    @Bean
    @Lazy
    public Step batchFileUploader() throws Exception{
        return stepBuilders.get("batchFileUploader")
                .partitioner(batchFileUploadStep().getName(), partitioner())
                .step(batchFileUploadStep())
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step archiveFiles(){
        return stepBuilders.get("archiveFiles")
                .tasklet(new ArchiveFilesTasklet(properties))
                .build();
    }

    @Bean
    public Step batchFileUploadStep(){
        return stepBuilders.get("batchFileUploadStep")
                .<MyDomain, MyDomain>chunk(10000)
                .reader(fileReader(null))
                .writer(fileWriter())
                .build();
    }

    @Bean
    @Lazy
    public Partitioner partitioner() throws Exception{

        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();

        Resource[] resources;
        try{
            /*
            Here the resources is selected from a path where the previous MoveFilesTasklet moves the file
            This returns null since Spring Initialize this bean eagerly before the step is called for execution.
            */
            resources = resourcePatternResolver.getResources("file:" + properties.getPath() + "/*.csv");
        }
        catch(IOException e){
            throw new RuntimeException("I/O problems when resolving the input file pattern.", e);
        }

        partitioner.setResources(resources);
        return partitioner;
    }

    @Bean
    public TaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    @Bean
    @StepScope
    public FlatFileItemReader<MyDomain> fileReader(
            @Value("#{stepExecutionContext['fileName']}") String filename){
        FlatFileItemReader<MyDomain> reader = new FlatFileItemReader<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        DefaultLineMapper<MyDomain> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new MyFieldSetMapper());
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);
        reader.setResource(new PathResource(filename));

        return reader;
    }

    @Bean
    public ItemWriter<MyDomain> fileWriter(){
        return new FileWriter();
    }


    private JobRepository getJobRepository() throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
        jobRepositoryFactoryBean.setDatabaseType("MySql");
        jobRepositoryFactoryBean.afterPropertiesSet();
        return jobRepositoryFactoryBean.getObject();
    }

    private PlatformTransactionManager getTransactionManager(){
        return new ResourcelessTransactionManager();
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception{
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

}

Upvotes: 1

Views: 3918

Answers (2)

Thomas Escolan
Thomas Escolan

Reputation: 1529

My 2p with Spring Boot:

@Bean
@StepScope
public Partitioner logsPartitioner(@Value("file:${my.resources.path}/${my.resources.input:*}.csv") Resource[] resources) {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setResources(resources);
    partitioner.partition(resources.length);
    return partitioner;
}

Upvotes: 0

Niraj Sonawane
Niraj Sonawane

Reputation: 11055

Below might be the issue

1 No need to do reader.setResource(new PathResource(filename)); inside FlatFileItemReader as it will override the resources set by partitioner Bean

2 Use PathMatchingResourcePatternResolver to load files in partitioner Bean Sample code

 ClassLoader cl = this.getClass().getClassLoader();
            ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
            resources = resolver.getResources("file:" + properties.getPath() + "/*.csv");

3: you can also add @StepScope on partitioner bean (Not sure abot this)

Hope this helps :)

Upvotes: 1

Related Questions