Reputation: 15
I have a job to run at a specific time every day. It has two steps, each has a rest call set up in reader() and processor() parts. Resources are account numbers stored in a MySQL DB. So, Spring Batch job runs fine, we getting expected output. But runs only on one thread. I tried to parallelize it, went through documentation and few examples, and after some time used this particular example. Here is my job configuration code in java.
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(60);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public Job processJob(BatchListener listener) {
return jobBuilderFactory.get("Job").incrementer(new RunIdIncrementer()).listener(listener)
.flow(processStep1()).on("*").to(Step2()).end().build();
}
@Bean
public Step processStep1() {
return (Step) stepBuilderFactory.get("Step1")
.<response, response>chunk(3).
reader(getItemReader()).
processor(getItemProcess()).
writer(getItemWriter()).
taskExecutor(taskExecutor()).
throttleLimit(2).
build();
}
@Bean
public Step processStep2() {
SimpleStepBuilder<AccountResponse,batch_details> process = stepBuilderFactory.get("processStep2")
.<AccountResponse,batch_details>chunk(5).reader(getBatchReader()).processor(getBatchProcessor());
return process.writer(getBatchWriter()).build();
}
This configuration is running only on one thread even though task executor is configured. Can someone help me what I am doing wrong or missing to run it in different threads. I would like to parallelize the step1 and step2, and data concurrency is not an issue. If I get step 1 parallelized, I will replicate step2. Thank you.
Sample output:
Thread # 37 is doing this task
Hibernate: Select * from batch_details where status != 'complete' and session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3
Hibernate: update batch_details set status = 'in_cs' where account_id= ?
Hibernate: update batch_details set session_completion_time=?, session_id=?, status=? where account_id=?
accountnumber1
Thread # 37 is doing this task
Hibernate: Select * from batch_details where status != 'complete' and session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3
Hibernate: update batch_details set status = 'in_cs' where account_id= ?
accountnumber2
And another question, if I change the chunk size, reader is repeated chunk size times but in the same thread. I cannot comprehend what this means at this stage, if you can also explain why this is happening, many thanks.
Upvotes: 1
Views: 2763
Reputation: 1119
Your job does not incorporate a parallel flow. Currently it just executes sequentially step 1 and then step 2 upon completion of step 1.
In this question Hansjoerg Wingeier gave a nice way to execute steps in parallel with a set of helper methods:
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
// -1 indicates no concurrency limit at all, steps.size is in this case 2 threads, 1 means just 1 thread.
taskExecutor.setConcurrencyLimit(steps.size());
List<Flow> flows = steps.stream() // we have to convert the steps to a flows
.map(step -> //
new FlowBuilder<Flow>("flow_" + step.getName()) //
.start(step) //
.build()) //
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
.add(flows.toArray(new Flow[flows.size()])) //
.build();
}
Your job would look something like this:
@Bean
public Job myJob() {
List<Step> steps = new ArrayList<>();
steps.add(processStep1);
steps.add(processStep2);
return jobBuilderFactory.get("yourJobName")
.start(createParallelFlow(steps));
.end()
.build();
}
Upvotes: 1