arbit
arbit

Reputation: 15

Step is running only on one thread with Task Executor.

I have a job to run at a specific time every day. It has two steps, each has a rest call set up in reader() and processor() parts. Resources are account numbers stored in a MySQL DB. So, Spring Batch job runs fine, we getting expected output. But runs only on one thread. I tried to parallelize it, went through documentation and few examples, and after some time used this particular example. Here is my job configuration code in java.

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setMaxPoolSize(60);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

@Bean
public Job processJob(BatchListener listener) {
    return jobBuilderFactory.get("Job").incrementer(new RunIdIncrementer()).listener(listener)
            .flow(processStep1()).on("*").to(Step2()).end().build();
}
@Bean
public Step processStep1() {
    return (Step) stepBuilderFactory.get("Step1")
            .<response, response>chunk(3).
            reader(getItemReader()).
            processor(getItemProcess()).
            writer(getItemWriter()).
            taskExecutor(taskExecutor()).
            throttleLimit(2).
            build();

}


@Bean 
public Step processStep2() {

    SimpleStepBuilder<AccountResponse,batch_details> process = stepBuilderFactory.get("processStep2")
            .<AccountResponse,batch_details>chunk(5).reader(getBatchReader()).processor(getBatchProcessor());
    return process.writer(getBatchWriter()).build();

}

This configuration is running only on one thread even though task executor is configured. Can someone help me what I am doing wrong or missing to run it in different threads. I would like to parallelize the step1 and step2, and data concurrency is not an issue. If I get step 1 parallelized, I will replicate step2. Thank you.

Sample output:

Thread # 37 is doing this task
Hibernate: Select * from batch_details where status != 'complete' and     session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3
Hibernate: update batch_details set status = 'in_cs' where account_id= ?
Hibernate: update batch_details set session_completion_time=?, session_id=?,     status=? where account_id=?
accountnumber1

Thread # 37 is doing this task
Hibernate: Select * from batch_details where status != 'complete' and session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3
Hibernate: update batch_details set status = 'in_cs' where account_id= ?
accountnumber2

And another question, if I change the chunk size, reader is repeated chunk size times but in the same thread. I cannot comprehend what this means at this stage, if you can also explain why this is happening, many thanks.

Upvotes: 1

Views: 2763

Answers (1)

Sander_M
Sander_M

Reputation: 1119

Your job does not incorporate a parallel flow. Currently it just executes sequentially step 1 and then step 2 upon completion of step 1.

In this question Hansjoerg Wingeier gave a nice way to execute steps in parallel with a set of helper methods:

// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    // -1 indicates no concurrency limit at all, steps.size is in this case 2 threads, 1 means just 1 thread. 
    taskExecutor.setConcurrencyLimit(steps.size());

    List<Flow> flows = steps.stream() // we have to convert the steps to a flows
        .map(step -> //
                new FlowBuilder<Flow>("flow_" + step.getName()) //
                .start(step) //
                .build()) //
            .collect(Collectors.toList());

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
        .add(flows.toArray(new Flow[flows.size()])) //
        .build();
}

Your job would look something like this:

@Bean
public Job myJob() {

   List<Step> steps = new ArrayList<>();
   steps.add(processStep1);
   steps.add(processStep2);

   return jobBuilderFactory.get("yourJobName")               
        .start(createParallelFlow(steps));                
        .end()
        .build();
    }

Upvotes: 1

Related Questions