storm_buster
storm_buster

Reputation: 7568

How to run implement spring-batch to process csv files line by line?

I have a spring batch application which reads data from a csv file, pass all the lines and process it, pass all the processed lines and write it to a database. Very classic. Now my problem is that the csv file is too large, I have a java heap space, so I thought I could optimize that by processing the file per x lines, let's say per 10000 lignes (to release memory each 10000 instead of loading all the lines in the memory).

Is there anyway to tell spring-batch to process a step in recursive way ? Or is there any other way to solve my problem?

Any advise will be much appreciated. Thanks

Upvotes: 3

Views: 15615

Answers (1)

incomplete-co.de
incomplete-co.de

Reputation: 2137

here's an example of processing the following csv file into a bean

headerA,headerB,headerC
col1,col2,col3

the first row (header) is ignored and the other columns are mapped directly into a 'matching' object. (this is only done this way for brevity).

here's the job configuration using Spring Batch Out Of The Box components;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <batch:job id="fileJob">
        <batch:step id="fileJob.step1">
            <batch:tasklet>
                <batch:chunk reader="fileReader" writer="databaseWriter" commit-interval="10000"/>
            </batch:tasklet>
        </batch:step>
        <batch:validator>
            <bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
                <property name="requiredKeys" value="fileName"/>
            </bean>
        </batch:validator>
    </batch:job>

    <bean id="fileReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="file:#{jobParameters['fileName']}"/>
        <property name="linesToSkip" value="1"/>
    </bean>

    <bean id="lineMapper"
        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
        <property name="lineTokenizer" ref="lineTokenizer"/>
    </bean>


    <bean id="lineTokenizer"
        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="col1,col2,col3"/>
    </bean>

    <bean id="fieldSetMapper"
        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="de.incompleteco.spring.batch.domain.SimpleEntity"/>
    </bean>

    <bean id="databaseWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
        <property name="sql" value="insert into simple_entity (col1,col2,col3) values (:col1,:col2,:col3)"/>
    </bean>
</beans>

there are a couple of note;

  1. this job needs a parameter 'fileName' to tell the fileReader where to find the file.
  2. there's a jobParametersValidator set to make sure the parameter is there

here's the batch resource configuration;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <batch:job-repository id="jobRepository"/>

    <bean id="jobExplorer"
        class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
    </bean>
    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
        <property name="taskExecutor" ref="taskExecutor"/>
    </bean>

    <beans profile="junit">
        <jdbc:embedded-database id="dataSource" type="H2">
            <jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
            <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
        </jdbc:embedded-database>

        <task:executor id="taskExecutor"/>

        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    </beans>
</beans>

here's a unit test for it too

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
@ActiveProfiles("junit")
public class FileJobIntegrationTest {

    @Autowired
    private Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private DataSource dataSource;

    private int recordCount = 1000000;

    private String fileName = System.getProperty("java.io.tmpdir") + File.separator + "test.csv";

    @Before
    public void before() throws Exception {
        if (new File(fileName).exists()) {
            new File(fileName).delete();
        }//end if
    }

    @Test
    public void test() throws Exception {
        //create a file
        FileOutputStream fos = new FileOutputStream(fileName);
        fos.write("col1,col2,col3".getBytes());
        fos.flush();
        for (int i=0;i<=recordCount;i++) {
            fos.write(new String(i + "," + (i+1) + "," + (i+2) + "\n").getBytes());
            fos.flush();//flush it
        }//end for
        fos.close();
        //lets get the size of the file
        long length = new File(fileName).length();
        System.out.println("file size: " + ((length / 1024) / 1024));
        //execute the job
        JobParameters jobParameters = new JobParametersBuilder().addString("fileName",fileName).toJobParameters();
        JobExecution execution = jobLauncher.run(job,jobParameters);
        //monitor
        while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
            Thread.sleep(1000);
        }//end while
        //load again
        execution = jobExplorer.getJobExecution(execution.getId());
        //test
        assertEquals(ExitStatus.COMPLETED.getExitCode(),execution.getExitStatus().getExitCode());
        //lets see what's in the database
        int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from simple_entity", Integer.class);
        //test
        assertTrue(count == recordCount);
    }

}

Upvotes: 6

Related Questions