
Reputation: 5790

How to make spring batch step execution parallel with configurable thread count?

I am having following spring-batch application

package com.spbt.job.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

public class SpringBatchApplication {
    public static void main(String[] args) {, args);

package com.spbt.job.sample;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class TraverseJob {

    protected JobBuilderFactory jobBuilderFactory;

    protected StepBuilderFactory stepBuilderFactory;

    private String inputFolderPath = "/tmp/inputFolder";

    public Job job() {
        return jobBuilderFactory.get("TraverseJob")
                .incrementer(new RunIdIncrementer())

    public Step traverseStep() {
        return stepBuilderFactory.get("TraverseStep")

    public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
        TraverseJobTasklet tasklet = new TraverseJobTasklet();


        return tasklet;

package com.spbt.job.sample;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;

public class TraverseJobTasklet implements Tasklet {

    private String jobDirPath;
    private String jobDate;

    private RemoteFilePush remoteFilePush;

    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        try {
            traverseDir(new File(jobDirPath));
        } catch (Exception ex) {
            throw ex;
        return RepeatStatus.FINISHED;

    private void traverseDir(File filePath) throws Exception {
        try {
            File[] files = filePath.listFiles();
            if (files != null) {
                for (File file : files) {
                    String name = file.getName();
                    if (file.isDirectory()) {
                        if (remoteFilePush.isRemoteDirExist(name)) {
                        } else {
                    } else {
            } else {
                throw new Exception("empty/null dir -> " + filePath.getName());
        } catch (Exception ex) {
            throw ex;

    public String getJobDirPath() {
        return jobDirPath;

    public void setJobDirPath(String jobDirPath) {
        this.jobDirPath = jobDirPath;

    public String getJobDate() {
        return jobDate;

    public void setJobDate(String jobDate) {
        this.jobDate = jobDate;

package com.spbt.job.sample;

import org.springframework.stereotype.Component;

public class RemoteFilePush {

    public boolean isRemoteDirExist(String name) throws InterruptedException {
        boolean isRemoteDirExist = false;
        // code to check dir on remote server
        return isRemoteDirExist;

    public void createRemoteDir(String name) throws InterruptedException {
        // code to create dir on remote server

    public void pushFile(String path) throws InterruptedException {
        // code to push file on remote server

I want to do parallel traversal and execution in traverseDir method in TraverseJobTasklet, by keeping my RemoteFilePush Logic intact, my inputFolderPath can contain multiple child directories each of which contains some files in it.

I have tried to follow link for spring-batch version which I am using, But its xml based and I do not seem to get how can I create multiple step out of single traverseStep I have?

Upvotes: 0

Views: 2084

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

input a sub-folder string path per worker step is where i am hitting wall with spring code, if you can point me to some ref. it will be helpful, most of the example on net is xml based.

Here is a quick self-contained example with Java config:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

public class PartitionJobSample {

    private final JobBuilderFactory jobs;
    private final StepBuilderFactory steps;

    public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) { = jobs;
        this.steps = steps;

    public Step managerStep() {
        return steps.get("masterStep")
                .partitioner(workerStep().getName(), partitioner(null))

    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production

    public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
        List<String> subFolders = getSubFolders(rootFolder);
        return new Partitioner() {
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> map = new HashMap<>(gridSize);
                for (String folder : subFolders) {
                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.put("filePath", folder);
                    map.put("partition-for-" + folder, executionContext);
                return map;

    private List<String> getSubFolders(String rootFolder) {
        // TODO implement this
        return Arrays.asList("/data/folder1", "/data/folder2");

    public Step workerStep() {
        return steps.get("workerStep")

    public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
        return new TraverseJobTasklet(filePath);

    public Job job() {
        return jobs.get("job")

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("rootFolder", "/data")
                .toJobParameters();, jobParameters);

    class TraverseJobTasklet implements Tasklet {

        private String filePath;

        public TraverseJobTasklet(String filePath) {
            this.filePath = filePath;

        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            // TODO call traversePath for filePath which is a sub-folder here
            System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
            return RepeatStatus.FINISHED;


It passes the root directory as a job parameter and executes a partitioned step where each worker processes a sub-folder (calling your tasklet).

If you run it, you should see something like:

SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2

I will let you adapt it to your situation accordingly.

Upvotes: 1

Related Questions