Reputation: 26503
I'm having simple chunk CSV processing job.
I would like to change execution flow when there is particular type of error during processing (eg. invalid line structure)
In order to prevent throwing errors I need to provide custom exceptionHandler
that will swallow parsing exception:
fun processCsvStep(
stepBuilderFactory: StepBuilderFactory,
reader: ItemReader<InputRow>,
processor: ItemProcessor<InputRow, OutputObject>,
writer: ItemWriter<OutputObject>
) = stepBuilderFactory.get(PROCESS_CSV_STEP)
.chunk<InputRow, OutputObject>(
.exceptionHandler { context: RepeatContext, throwable: Throwable ->
logger.error { "Exception during parsing: ${throwable.message}" }
Then in my Job I can rely only on rollback count:
fun createCsvJob(jobs: JobBuilderFactory, processCsvStep: Step, moveCsvStep: Step, moveFailedCsvStep: Step) = jobs.get(PROCESS_CSV_JOB)
.next { jobExecution: JobExecution, stepExecution: StepExecution ->
return@next when (stepExecution.rollbackCount) {
0 -> FlowExecutionStatus.COMPLETED
else -> FlowExecutionStatus.FAILED
Is there any way to pass information from exception handler to JobExecutionDecider
? I would like to make execution decision based on type of exception that happened during parsing. Is this possible?
Upvotes: 2
Views: 1221
Reputation: 31650
I would like to make execution decision based on type of exception that happened during parsing. Is this possible?
You can get access to the exception that happened during the step from the decider through stepExecution#getFailureExceptions
. Here is an example:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class MyJob {
private JobBuilderFactory jobs;
private StepBuilderFactory steps;
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
if (items.contains(3)) {
throw new IllegalArgumentException("no 3!");
System.out.println("item = " + item);
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(5)
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
public Step step3() {
return steps.get("step3")
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
public JobExecutionDecider decider() {
return (jobExecution, stepExecution) -> {
int rollbackCount = stepExecution.getRollbackCount();
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
System.out.println("rollbackCount = " + rollbackCount);
System.out.println("failureExceptions = " + failureExceptions);
// make the decision based on rollbackCount and/or failureExceptions and return status accordingly
return FlowExecutionStatus.COMPLETED;
public Job job() {
return jobs.get("job")
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);, new JobParameters());
In this example, if an exception occurs during step1
, the decider can get it from the step execution and make the decision accordingly (go to step2
or step3
So I'm not sure you really need an exception handler and a way to pass information to the decider. The same idea applies of you want to make the decision based on the rollbackCount
, commitCount
, readCount
, or any other metric.
Hope this helps.
Upvotes: 1