pixel
pixel

Reputation: 26503

Controlling job execution based on exceptions in simple chunk job in Spring Batch

I'm having simple chunk CSV processing job.

I would like to change execution flow when there is particular type of error during processing (eg. invalid line structure)

In order to prevent throwing errors I need to provide custom exceptionHandler that will swallow parsing exception:

@Bean
fun processCsvStep(
    stepBuilderFactory: StepBuilderFactory,
    reader: ItemReader<InputRow>,
    processor: ItemProcessor<InputRow, OutputObject>,
    writer: ItemWriter<OutputObject>
) = stepBuilderFactory.get(PROCESS_CSV_STEP)
    .chunk<InputRow, OutputObject>(
        CHUNKS_NUMBER
    )
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .exceptionHandler { context: RepeatContext, throwable: Throwable ->
        context.setTerminateOnly()
        logger.error { "Exception during parsing: ${throwable.message}" }
    }
    .build()!!

Then in my Job I can rely only on rollback count:

@Bean
fun createCsvJob(jobs: JobBuilderFactory, processCsvStep: Step, moveCsvStep: Step, moveFailedCsvStep: Step) = jobs.get(PROCESS_CSV_JOB)
    .start(processCsvStep)
    .next { jobExecution: JobExecution, stepExecution: StepExecution ->
        return@next when (stepExecution.rollbackCount) {
            0 -> FlowExecutionStatus.COMPLETED
            else -> FlowExecutionStatus.FAILED
        }

    }
    .on(FlowExecutionStatus.FAILED.name)
    .to(moveFailedCsvStep)
    .on(FlowExecutionStatus.COMPLETED.name)
    .to(moveCsvStep)
    .end()
    .build()!!

Is there any way to pass information from exception handler to JobExecutionDecider? I would like to make execution decision based on type of exception that happened during parsing. Is this possible?

Upvotes: 2

Views: 1221

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31650

I would like to make execution decision based on type of exception that happened during parsing. Is this possible?

You can get access to the exception that happened during the step from the decider through stepExecution#getFailureExceptions. Here is an example:

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                if (items.contains(3)) {
                    throw new IllegalArgumentException("no 3!");
                }
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step1() {
        return steps.get("step1")
                .<Integer, Integer>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step step3() {
        return steps.get("step3")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step3");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public JobExecutionDecider decider() {
        return (jobExecution, stepExecution) -> {
            int rollbackCount = stepExecution.getRollbackCount();
            List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
            System.out.println("rollbackCount = " + rollbackCount);
            System.out.println("failureExceptions = " + failureExceptions);
            // make the decision based on rollbackCount and/or failureExceptions and return status accordingly
            return FlowExecutionStatus.COMPLETED;
        };
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step1())
                .on("*").to(decider())
                .from(decider()).on("COMPLETED").to(step2())
                .from(decider()).on("FAILED").to(step3())
                .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

In this example, if an exception occurs during step1, the decider can get it from the step execution and make the decision accordingly (go to step2 or step3).

So I'm not sure you really need an exception handler and a way to pass information to the decider. The same idea applies of you want to make the decision based on the rollbackCount, commitCount, readCount, or any other metric.

Hope this helps.

Upvotes: 1

Related Questions