Reputation: 131
I am a bit confused, because when starting the execution of a Spring Batch job by HTTP request, if I receive another HTTP request to start the same job, but with different parameters while the job is executing, the job that is being executed stops unfinished and processing of the new job starts.
I've developed an API REST to load and process the content of Excel files. The web service exposes two endpoints, one to load, validate and store the content of Excel files in the database and the other to start the processing of the records stored in the database.
POST /api/excel/upload This endpoint receives the Excel files. When a request is received, each file is assigned a unique identifier and its content is validated. If the content is correct, it inserts it into a temporary table waiting to be processed.
GET /api/Excel/process?id=x This endpoint receives the identifiers of the files to be processed. When a request is received, a Spring Batch job is started to process the records in the temporary table.
@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
return super.getResponse().returnPage(service.upload(multipartFile));
}
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
return response;
}
I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish
public void startJob(int idCarga)
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
try
{
jobLauncher.run(job, params);
}
catch (JobExecutionException e)
{
log.error("---ERROR: {}", e.getMessage());
}
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
Performing some tests I have observed the following behavior:
If I make a request to the endpoint /process to process each file at different times: in this case, all the records stored in the temporary table are processed:
If I make a request to the endpoint /process to first process file1, and before it finishes I make another request to process file2: in this case, not all the records stored in the temporary table are processed:
Upvotes: 0
Views: 1209
Reputation: 131
Thanks to help with answer from @Mahmoud Ben Hassine, I was able to resolve the issue. To help with the implementation, in case someone comes to this question, I share the code that, in my case, has worked to solve the problem:
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
jobLauncher.run(job, params);
}
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private StepSkipListener stepSkipListener;
@Autowired
private MainJobExecutionListener mainJobExecutionListener;
@Bean
public TaskExecutor taskExecutor()
{
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setThreadNamePrefix("batch-thread-");
return taskExecutor;
}
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(taskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
}
If after applying this code, as it happened to me, you have also had problems inserting the records in the database, you can go through this question where I also put the code that works for me.
Upvotes: 1
Reputation: 31710
The JobLauncher
does not stop job executions, it only launches them. The default job launcher provided by Spring Batch is the SimpleJobLauncher
which delegates job launching to a TaskExecutor
. Now depending on the task executor implementation you use and how it is configured to launch concurrent tasks, you can see different behaviours. For example, when you launch a new job execution and a new task is submitted to the task executor, the task executor can decide to reject this submission if all workers are busy, or put it in a waiting queue, or stop another task and submit the new one. Those strategies depend on several parameters (TaskExecutor
implementation, the type of the queue used behind the scene, the RejectedExecutionHandler
implementation, etc) .
In your case, you seem to be using the following:
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
So you need to check the behaviour of this pool with regard to how it handles new task submissions (I guess this is what is stopping your jobs, but you need to confirm that). That said, I don't see why you need this. If your requirement is the following:
I use DeferredResult to send a response to the client after receiving the request without waiting for the job to finish
Then you can use an asynchronous task executor implementation (like the ThreadPoolTaskExecutor
) in your job launcher, see Running Jobs from within a Web Container.
Upvotes: 2