Reputation: 677
I want to implement the functionality of restart job to start it from initial sage. I am facing two issues.
First Problem: When I restart the job very first time it will create a new job instance id and behave like a fresh job. In the second time, it will restart and run with same job instance id. (I sent the execution id from rest controller)
Second Problem: It will start from the initial stage when I will restart it.
Custom Reader:
package com.orange.alc.dabekdataload.reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.core.io.FileSystemResource;
import org.springframework.stereotype.Component;
import com.orange.alc.dabekdataload.constants.PostalHeader;
import com.orange.alc.dabekdataload.dto.PostalDto;
@Component("itemReader")
@Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class PostalReader implements ItemReader<PostalDto>, ItemStream{
private static final Logger LOGGER = LoggerFactory.getLogger(PostalReader.class);
@Value("#{jobParameters[fullPathFileName]}")
public String fileName;
private int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
private FlatFileItemReader<PostalDto> reader;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
LOGGER.info("Executing batch reader...");
reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(fileName));
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper<PostalDto>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(PostalHeader.getPostalColumnNames());
}});
setFieldSetMapper(new PostalFieldSetMapper());
}});
reader.setSaveState(true);
reader.open(stepExecution.getExecutionContext());
}
@Override
public PostalDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
reader.setCurrentItemCount(currentIndex++);
return reader.read();
}
@AfterStep
public void afterStep(StepExecution stepExecution) {
LOGGER.info("Closing the reader...");
reader.close();
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey(CURRENT_INDEX)){
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
} else{
currentIndex = 0;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
@Override
public void close() throws ItemStreamException {
}
}
Job Restart Code:
@Override
public void restartJob(Long jobId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
LOGGER.info("Restarting job with JobId: {}", jobId);
jobOperator.restart(jobId);
}
Please let me know in case you need any code from my side.
Upvotes: 0
Views: 1631
Reputation: 89
The update method from itemStream will put the exeuctionContext in current Step's execution context. But when you restart the job, the step re run as a new step and it cannot get the execution context of previous step. In order to start processing the data from where it left for restart, you need to save the state in job execution context.
Upvotes: 0
Reputation: 31620
The delegate reader (FlatFileItemReader
) used in your custom reader (PostalReader
) is not honouring the ItemStream
contract. You need to call open/update/close
on the delegate reader in the corresponding open/update/close
methods of your item reader. Something like:
public class PostalReader implements ItemReader<PostalDto>, ItemStream{
private FlatFileItemReader<PostalDto> reader;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
reader.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
reader.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
reader.close();
}
}
Upvotes: 1