Vimit Dhawan
Vimit Dhawan

Reputation: 677

spring batch restart job starts from initial stage instead of where it left off?

I want to implement the functionality of restart job to start it from initial sage. I am facing two issues.

First Problem: When I restart the job very first time it will create a new job instance id and behave like a fresh job. In the second time, it will restart and run with same job instance id. (I sent the execution id from rest controller)

Second Problem: It will start from the initial stage when I will restart it.

Custom Reader:

package com.orange.alc.dabekdataload.reader;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.AfterStep;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemStream;
    import org.springframework.batch.item.ItemStreamException;
    import org.springframework.batch.item.NonTransientResourceException;
    import org.springframework.batch.item.ParseException;
    import org.springframework.batch.item.UnexpectedInputException;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Scope;
    import org.springframework.context.annotation.ScopedProxyMode;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.stereotype.Component;

    import com.orange.alc.dabekdataload.constants.PostalHeader;
    import com.orange.alc.dabekdataload.dto.PostalDto;

    @Component("itemReader")
    @Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
    public class PostalReader implements ItemReader<PostalDto>, ItemStream{

        private static final Logger LOGGER = LoggerFactory.getLogger(PostalReader.class);

        @Value("#{jobParameters[fullPathFileName]}")
        public String fileName;

        private int currentIndex = 0;

        private static final String CURRENT_INDEX = "current.index";

        private FlatFileItemReader<PostalDto> reader;

        @BeforeStep
        public void beforeStep(StepExecution stepExecution) {
            LOGGER.info("Executing batch reader...");
            reader = new FlatFileItemReader<>();
            reader.setResource(new FileSystemResource(fileName));
            reader.setLinesToSkip(1);
            reader.setLineMapper(new DefaultLineMapper<PostalDto>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                    setNames(PostalHeader.getPostalColumnNames());
                }});
                setFieldSetMapper(new PostalFieldSetMapper());
            }});
            reader.setSaveState(true);
            reader.open(stepExecution.getExecutionContext());

        }

        @Override
        public PostalDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
            reader.setCurrentItemCount(currentIndex++);
           return reader.read();
        }

        @AfterStep
        public void afterStep(StepExecution stepExecution) {
            LOGGER.info("Closing the reader...");
            reader.close();
        }

        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException {
            if(executionContext.containsKey(CURRENT_INDEX)){
                currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
            } else{
                currentIndex = 0;
            }

        }

        @Override
        public void update(ExecutionContext executionContext) throws ItemStreamException {
            executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());

        }

        @Override
        public void close() throws ItemStreamException {


        }


    }

Job Restart Code:

@Override
public void restartJob(Long jobId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
    LOGGER.info("Restarting job with JobId: {}", jobId);
    jobOperator.restart(jobId);
}

Please let me know in case you need any code from my side.

Upvotes: 0

Views: 1631

Answers (2)

Zubair A.
Zubair A.

Reputation: 89

The update method from itemStream will put the exeuctionContext in current Step's execution context. But when you restart the job, the step re run as a new step and it cannot get the execution context of previous step. In order to start processing the data from where it left for restart, you need to save the state in job execution context.

Upvotes: 0

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31620

The delegate reader (FlatFileItemReader) used in your custom reader (PostalReader) is not honouring the ItemStream contract. You need to call open/update/close on the delegate reader in the corresponding open/update/close methods of your item reader. Something like:

public class PostalReader implements ItemReader<PostalDto>, ItemStream{

   private FlatFileItemReader<PostalDto> reader;

   @Override
   public void open(ExecutionContext executionContext) throws ItemStreamException {
      reader.open(executionContext);
   }

   @Override
   public void update(ExecutionContext executionContext) throws ItemStreamException {
      reader.update(executionContext);
   }

   @Override
   public void close() throws ItemStreamException {
      reader.close();
   }
}

Upvotes: 1

Related Questions