KCMS
KCMS

Reputation: 217

spring batch: processor called twice after skip

I have a defined a chunk with commit-interval as 10, skip-limit as 10. A processor class manipulates a field by applying some arithmetic operations. In processor class exception occurs for one of the records (say 6th record). After this, once again records from 1 to 5 are processed, 6th is skipped, 7-10 are processed and written to a XML (a custom XML writer class). Since the processor processes 1-5 records twice, the expected field value is wrong as it is calculated twice. Can you please suggest a solution to have the processor process the records only once, skip only the failed record and write the processed records to XML?

Implemented a SkipListener with onSkipInProcess(), onSkipInRead(), onSkipInWrite(). But the output is still the same.

jobconfig.xml

<batch:job id="job">
    <batch:step id="step">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter" 
                processor="itemProcessor" commit-interval="10" skip-limit="5" retry-limit="0" >
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception"/>
                </batch:skippable-exception-classes>
                <batch:listeners>
                    <batch:listener ref="skipListener" />
                </batch:listeners>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>  
<bean id="itemWriter" class="a.b.XWriter" scope="step"/>
<bean id="skipListener" class="a.b.SkipListener"/>
<bean id="itemProcessor" class="a.b.XProcessor" scope="step"/>
<bean id="itemReader" class="a.b.XReader"/>

ItemReader class:

public class XReader implements ItemReader {
@Autowired
private XRepository classDao;

private List lst = new ArrayList();
private int index= 0;

public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    if (lst.isEmpty()) {
           lst = classDao.findAll();
    } 
    if (index < lst.size()) {
        return lst.get(index++);
    } else return null;

}
}

ItemProcessor class:

public class XProcessor<T> implements ItemProcessor<T, T> {
public Object process(Object item) throws Exception {
    // logic here
}

ItemWriter class:

public class XWriter <T> implements ItemWriter<T> {
public void write(List<? extends T> items) throws Exception {
    // logic here to write to XML
}}

SkipListener class:

public class SkipListener<T,S> implements org.springframework.batch.core.SkipListener<T, S> {

public void onSkipInProcess(T arg0, Throwable arg1) {
}

public void onSkipInRead(Throwable arg0) {
}

public void onSkipInWrite(S arg0, Throwable arg1) {
}

}

Upvotes: 2

Views: 4035

Answers (2)

Michael Minella
Michael Minella

Reputation: 21453

When using ItemProcessors in a fault tolerant step, they are expected to be idempotent because there is a risk that they will be called multiple times (as seen in your example). You can read more about this in section 6.3.3 of the documentation here: http://docs.spring.io/spring-batch/reference/html/readersAndWriters.html

Upvotes: 2

Shankar
Shankar

Reputation: 8957

You need to have a listener implementation like the below one. Whenever some exception occurs it calls the corresponding method, if you want , you can handle otherwise just leave the method empty. so it will not fail the job.

Also it will not call the processor twice.

xml configuration:

<batch:listeners>
                    <batch:listener ref="recordSkipListener"/>
                  </batch:listeners>

Listener class:

public class RecordSkipListener implements SkipListener<Model> {



    @Override
    public void onSkipInRead(Throwable t) {


    }

    @Override
    public void onSkipInWrite(Model item, Throwable t) {

    }

    @Override
    public void onSkipInProcess(Model item, Throwable t) {


    }
}

Upvotes: 0

Related Questions