igarciadev
igarciadev

Reputation: 131

Running job stops if start new running the same job - Spring Batch

Issue

I am a bit confused, because when starting the execution of a Spring Batch job by HTTP request, if I receive another HTTP request to start the same job, but with different parameters while the job is executing, the job that is being executed stops unfinished and processing of the new job starts.

Context

I've developed an API REST to load and process the content of Excel files. The web service exposes two endpoints, one to load, validate and store the content of Excel files in the database and the other to start the processing of the records stored in the database.

How does it works

Some code

@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
    return super.getResponse().returnPage(service.upload(multipartFile));
}

@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
    DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
    response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));

    ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));

    return response;
}

I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish

public void startJob(int idCarga)
{
    JobParameters params = new JobParametersBuilder()
            .addString("mainJob", String.valueOf(System.currentTimeMillis()))
            .addString("idCarga", String.valueOf(idCarga))
            .toJobParameters();

    try
    {
        jobLauncher.run(job, params);
    }
    catch (JobExecutionException e)
    {
        log.error("---ERROR: {}", e.getMessage());
    }
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
    return stepBuilderFactory.get("step")
            .<List<ExcelLoad>, Invoice>chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
            .listener(stepSkipListener)
            .build();
}

@Bean
public Job mainJob(Step mainStep)
{
    return jobBuilderFactory.get("mainJob")
                            .listener(mainJobExecutionListener)
                            .incrementer(new RunIdIncrementer())
                            .start(mainStep)
                            .build();
}

Performing some tests I have observed the following behavior:

  1. If I make a request to the endpoint /process to process each file at different times: in this case, all the records stored in the temporary table are processed:

    • Records processed file1: 3606 (expected 3606).
    • Records processed file2: 1776 (expected 1776). enter image description here enter image description here
  2. If I make a request to the endpoint /process to first process file1, and before it finishes I make another request to process file2: in this case, not all the records stored in the temporary table are processed:

    • Records processed file1: 1080 (expected 3606)
    • Records processed file2: 1774 (expected 1776) enter image description here enter image description here

Upvotes: 0

Views: 1209

Answers (2)

igarciadev
igarciadev

Reputation: 131

Thanks to help with answer from @Mahmoud Ben Hassine, I was able to resolve the issue. To help with the implementation, in case someone comes to this question, I share the code that, in my case, has worked to solve the problem:

  • Controller
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
    JobParameters params = new JobParametersBuilder()
            .addString("mainJob", String.valueOf(System.currentTimeMillis()))
            .addString("idCarga", String.valueOf(idCarga))
            .toJobParameters();

    jobLauncher.run(job, params);
}
  • Batch config, job and steps
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private StepSkipListener stepSkipListener;

    @Autowired
    private MainJobExecutionListener mainJobExecutionListener;

    @Bean
    public TaskExecutor taskExecutor()
    {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setThreadNamePrefix("batch-thread-");

        return taskExecutor;
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception
    {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(taskExecutor());
        jobLauncher.afterPropertiesSet();

        return jobLauncher;
    }

    @Bean
    public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
    {
        return stepBuilderFactory.get("step")
                .<List<ExcelLoad>, Invoice>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                .listener(stepSkipListener)
                .build();
    }

    @Bean
    public Job mainJob(Step mainStep)
    {
        return jobBuilderFactory.get("mainJob")
                                .listener(mainJobExecutionListener)
                                .incrementer(new RunIdIncrementer())
                                .start(mainStep)
                                .build();
    }
}

If after applying this code, as it happened to me, you have also had problems inserting the records in the database, you can go through this question where I also put the code that works for me.

Upvotes: 1

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31710

The JobLauncher does not stop job executions, it only launches them. The default job launcher provided by Spring Batch is the SimpleJobLauncher which delegates job launching to a TaskExecutor. Now depending on the task executor implementation you use and how it is configured to launch concurrent tasks, you can see different behaviours. For example, when you launch a new job execution and a new task is submitted to the task executor, the task executor can decide to reject this submission if all workers are busy, or put it in a waiting queue, or stop another task and submit the new one. Those strategies depend on several parameters (TaskExecutor implementation, the type of the queue used behind the scene, the RejectedExecutionHandler implementation, etc) .

In your case, you seem to be using the following:

ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));

So you need to check the behaviour of this pool with regard to how it handles new task submissions (I guess this is what is stopping your jobs, but you need to confirm that). That said, I don't see why you need this. If your requirement is the following:

I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish

Then you can use an asynchronous task executor implementation (like the ThreadPoolTaskExecutor) in your job launcher, see Running Jobs from within a Web Container.

Upvotes: 2

Related Questions