gautam
gautam

Reputation: 31

spring batch multi thread file reading

In a Spring Batch, I am trying to read a CSV file and want to assign each row to a separate thread and process it. I have tried to achieve it by using Task Executor, it is working if i am not getting file name using job parameter. If I get through job parameters since the scope="step" all threads are reading the same line from the file. whether it will be resolved if I change the scope="job" if yes please suggest the way? currently, I am getting an error as below:

Caused by: java.lang.IllegalStateException: No Scope registered for scope name 'job'

Kindly help...

Find the Job.xml below

<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch"        restartable="true">
    <step id="step" allow-start-if-complete="true">
        <partition step="step2" partitioner="partitioner">
            <handler grid-size="3" task-executor="taskExecutor" />
        </partition>
    </step>
</job>

    <bean id="partitioner" class="com.range.part.RangePartitioner">
</bean>

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<step id="step2" xmlns="http://www.springframework.org/schema/batch">
    <tasklet transaction-manager="transactionManager">
        <chunk  reader="itemReader" writer="cutomitemWriter" processor="itemProcessor" commit-interval="100" />
    </tasklet>
</step>
<bean id="itemProcessor" class="com.range.processor.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}"/>
</bean>

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="job">
 <property name="resource" value="file:#{jobParameters[file]}"> 
 </property>    
  <!-- <property name="linesToSkip" value="1"/> -->
<property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value="," />
                    <!--  <property name="names" value="transactionBranch,batchEntryDate,batchNo,channelID,CountryCode" />-->
        </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.fieldset.FieldsetMapper">

                </bean>
            </property>
        </bean>
    </property>
    </bean>

<bean id="cutomitemWriter" class="com.range.processor.customitemWritter">
</bean>

Upvotes: 2

Views: 5922

Answers (2)

M. Mohamed
M. Mohamed

Reputation: 31

You can see this example (on Github) with a multi-threading job for importing a big CSV file (like 200,000 lines) into DB and exporting it from DB to a JSON file (FileReader and the FileWriter will have No-Thread Safe).

<batch:job id="transformJob">
    <batch:step id="deleteDir" next="cleanDB">
        <batch:tasklet ref="fileDeletingTasklet" />
    </batch:step>
    <batch:step id="cleanDB" next="countThread">
        <batch:tasklet ref="cleanDBTasklet" />
    </batch:step>
    <batch:step id="countThread" next="split">
        <batch:tasklet ref="countThreadTasklet" />
    </batch:step>
    <batch:step id="split" next="partitionerMasterImporter">
        <batch:tasklet>
            <batch:chunk reader="largeCSVReader" writer="smallCSVWriter"
                commit-interval="#{jobExecutionContext['chunk.count']}" />
        </batch:tasklet>
    </batch:step>
    <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
        <partition step="importChunked" partitioner="filePartitioner">
            <handler grid-size="10" task-executor="taskExecutor" />
        </partition>
    </batch:step>
    <batch:step id="partitionerMasterExporter" next="concat">
        <partition step="exportChunked" partitioner="dbPartitioner">
            <handler grid-size="10" task-executor="taskExecutor" />
        </partition>
    </batch:step>
    <batch:step id="concat">
        <batch:tasklet ref="concatFileTasklet" />
    </batch:step>
</batch:job>

<batch:step id="importChunked">
    <batch:tasklet>
        <batch:chunk reader="smallCSVFileReader" writer="dbWriter"
            processor="importProcessor" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<batch:step id="exportChunked">
    <batch:tasklet>
        <batch:chunk reader="dbReader" writer="jsonFileWriter"
            processor="exportProcessor" commit-interval="#{jobExecutionContext['chunk.count']}">
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<bean id="jsonFileWriter" class="com.batch.writer.PersonWriterToFile"
    scope="step">
    <property name="outputPath" value="csv/chunked/paged-#{stepExecutionContext[page]}.json" />
</bean>

<bean id="dbReader" class="com.batch.reader.PersonReaderFromDataBase" scope="step">
    <property name="iPersonRepository" ref="IPersonRepository" />
    <property name="page" value="#{stepExecutionContext[page]}"/>
    <property name="size" value="#{stepExecutionContext[size]}"/>
</bean>

<bean id="countThreadTasklet" class="com.batch.tasklet.CountingTasklet"
    scope="step">
    <property name="input" value="file:csv/input/#{jobParameters[filename]}" />
</bean>

<bean id="cleanDBTasklet" class="com.batch.tasklet.CleanDBTasklet" />

<bean id="fileDeletingTasklet" class="com.batch.tasklet.FileDeletingTasklet">
    <property name="directory" value="file:csv/chunked/" />
</bean>

<bean id="concatFileTasklet" class="com.batch.tasklet.FileConcatTasklet">
    <property name="directory" value="file:csv/chunked/" />
    <property name="outputFilename" value="csv/output/export.json" />
</bean>

<bean id="filePartitioner" class="com.batch.partitioner.FilePartitioner">
    <property name="outputPath" value="csv/chunked/" />
</bean>

<bean id="dbPartitioner" class="com.batch.partitioner.DBPartitioner" scope="step">
    <property name="pageSize" value="#{jobExecutionContext['chunk.count']}" />
</bean>

<bean id="largeCSVReader" class="com.batch.reader.LineReaderFromFile"
    scope="step">
    <property name="inputPath" value="csv/input/#{jobParameters[filename]}" />
</bean>

<bean id="smallCSVWriter" class="com.batch.writer.LineWriterToFile"
    scope="step">
    <property name="outputPath" value="csv/chunked/"></property>
</bean>

<bean id="smallCSVFileReader" class="com.batch.reader.PersonReaderFromFile"
    scope="step">
    <constructor-arg value="csv/chunked/#{stepExecutionContext[file]}" />
</bean>

<bean id="importProcessor" class="com.batch.processor.ImportPersonItemProcessor" />

<bean id="exportProcessor" class="com.batch.processor.ExportPersonItemProcessor" />

<bean id="dbWriter" class="com.batch.writer.PersonWriterToDataBase">
    <property name="iPersonRepository" ref="IPersonRepository" />
</bean>

In both cases, a partionner is used to splice into 10 files (one file per thread) for import and export to 10 files (one file per thread too), then we concatenate all to have a single file.

Hope this help.

Upvotes: 0

Nghia Do
Nghia Do

Reputation: 2658

I'm thinking a way which we can use Partitioner on top of it. At the partitioner level, we can reader the file (by using any CSV reader or Spring Reader also fine) and then process each line.

Every line will be added to the partitioner's queue (Map) so it achieves your requirement.

I have posted here code for your reference

public class LinePartitioner implements Partitioner {

@Value("#{jobParameters['fileName']}")
private String fileName;

Map<String, ExecutionContext> queue = new HashMap<>();

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

    BufferedReader reader = new BufferedReader(new FileReader(this.fileName));
    List<String> lines = new ArrayList<>();
    int count = 0;
    while ((line = reader.readLine()) != null) {

        ExecutionContext value = new ExecutionContext();
        value.put("lineContent", line);
        value.put("lineCount", count+1);

        queue.put(++count, value);
    }

    return queue;
}

}

As above code, you can replace Reader by any CSV reader or Spring Reader to simplified mapping field with Pojo object.

Please let me know if you need to full program, I will write and upload for you.

Thanks, Nghia

-- Update with an example to build Partitioner with 1000 items reader for Reader

@Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        try {
            Map<String, ExecutionContext> queue = new HashMap<>();

            List<List<String>> trunks = new ArrayList<>();

            // read and store data to a list of trunk
            int chunkSize = 1000;
            int count = 1;
            try (BufferedReader br = new BufferedReader(new FileReader("your file"))) {
                String line;
                List items = null;
                while ((line = br.readLine()) != null) {
                    if (count % chunkSize == 0) {
                        items = new ArrayList();
                        trunks.add(items);
                    }

                    items.add(line);
                }
            }

            // add to queue to start prorcessing
            for (int i=0; i<trunks.size(); i++) {
                ExecutionContext value = new ExecutionContext();
                value.put("items", trunks.get(i));
                queue.put("trunk"+i, value);
            }

            return queue;
        }

        catch (Exception e) {
            // handle exception
        }
}

Upvotes: 1

Related Questions