Reputation: 26503
I'm having simple chunk CSV processing job.
I would like to change execution flow when there is particular type of error during processing (eg. invalid line structure)
In order to prevent throwing errors I need to provide custom exceptionHandler
that will swallow parsing exception:
@Bean
fun processCsvStep(
stepBuilderFactory: StepBuilderFactory,
reader: ItemReader<InputRow>,
processor: ItemProcessor<InputRow, OutputObject>,
writer: ItemWriter<OutputObject>
) = stepBuilderFactory.get(PROCESS_CSV_STEP)
.chunk<InputRow, OutputObject>(
CHUNKS_NUMBER
)
.reader(reader)
.processor(processor)
.writer(writer)
.exceptionHandler { context: RepeatContext, throwable: Throwable ->
context.setTerminateOnly()
logger.error { "Exception during parsing: ${throwable.message}" }
}
.build()!!
Then in my Job I can rely only on rollback count:
@Bean
fun createCsvJob(jobs: JobBuilderFactory, processCsvStep: Step, moveCsvStep: Step, moveFailedCsvStep: Step) = jobs.get(PROCESS_CSV_JOB)
.start(processCsvStep)
.next { jobExecution: JobExecution, stepExecution: StepExecution ->
return@next when (stepExecution.rollbackCount) {
0 -> FlowExecutionStatus.COMPLETED
else -> FlowExecutionStatus.FAILED
}
}
.on(FlowExecutionStatus.FAILED.name)
.to(moveFailedCsvStep)
.on(FlowExecutionStatus.COMPLETED.name)
.to(moveCsvStep)
.end()
.build()!!
Is there any way to pass information from exception handler to JobExecutionDecider
? I would like to make execution decision based on type of exception that happened during parsing. Is this possible?
Upvotes: 2
Views: 1221
Reputation: 31650
I would like to make execution decision based on type of exception that happened during parsing. Is this possible?
You can get access to the exception that happened during the step from the decider through stepExecution#getFailureExceptions
. Here is an example:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
if (items.contains(3)) {
throw new IllegalArgumentException("no 3!");
}
System.out.println("item = " + item);
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("step2");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step3() {
return steps.get("step3")
.tasklet((contribution, chunkContext) -> {
System.out.println("step3");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public JobExecutionDecider decider() {
return (jobExecution, stepExecution) -> {
int rollbackCount = stepExecution.getRollbackCount();
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
System.out.println("rollbackCount = " + rollbackCount);
System.out.println("failureExceptions = " + failureExceptions);
// make the decision based on rollbackCount and/or failureExceptions and return status accordingly
return FlowExecutionStatus.COMPLETED;
};
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.on("*").to(decider())
.from(decider()).on("COMPLETED").to(step2())
.from(decider()).on("FAILED").to(step3())
.build()
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
In this example, if an exception occurs during step1
, the decider can get it from the step execution and make the decision accordingly (go to step2
or step3
).
So I'm not sure you really need an exception handler and a way to pass information to the decider. The same idea applies of you want to make the decision based on the rollbackCount
, commitCount
, readCount
, or any other metric.
Hope this helps.
Upvotes: 1