user3813035
user3813035

Reputation: 141

Spring Batch job throwing ItemStreamException when restarting; has a processor that skips records and the skip listener writes to a flat file

I could use your assistance in what is most likely a very simple mistake on my part.

I have a very basic test Spring Batch job. It utilizes a FlatFileItemReader to read from a file and writes using a FlatFileItemWriter. The custom processor throws a custom SkipObjectException if the item’s id ends in a. The skip listener catches the SkipObjectException and writes a message (string) to a file utilizing a FlatFileItemWriter.

When restarting, depending on when it fails, I get an ItemStreamException: Current file size is smaller than size at last commit. This exception occurs on the skip file. If I do not write to the file in the skip listener everything works fine.

Input file contents 001;first record 002;second record 003;third record 003a;first skip record 004;fourth record 005;fifth record 006;sixth record 007;seventh record 007a;second skip record 008;eighth record 009;ninth record 010;tenth record 011;eleventh record 012;twelfth record

Successful test The job runs fine. It writes 12 records (all but 003a and 007a) to the output file and two records (003a and 007a) to the skipped file. Great.

Failure test 1 – With a commit-interval of 5 and have the processor fail on item 004. The job fails, as expected, with both the output file and skip file empty; which makes sense since the exception occurred before the commit interval.

Restart failure test 1 – same as above but remove the exception from the processor. Restart the job and it runs as expected with the same output as the successful test.

Failure test 2 – With a commit-interval of 5; have the processor fail on item 004. Same result as before.

Restart Failure test 2 - With a commit-interval of 5; have the processor fail on item 008. The job restarts then fails where expected. The output file has 4 records (001 through 004 except for 003a) and the skip file has one record 003a. So, the output was as expected.

Restart Failure test 2 – With a commit-interval of 5; remove the exception from the processor. The job fails to restart with an exception of ItemStreamException: Current file size is smaller than size at last commit. This exception occurs on the skip file.

Failure test 3 – Set the commit-interval to 10 and have the processor fail on item 012. The job fails as expected on the last item. The output file contains items 001 through 008 except for 003a and 007a. The skip file contains 003a and 007a. All good.

Restart failure test 3 – Leave the commit-interval set to 10 and remove the exception from the processor. When I restart the job I get ItemStreamException: Current file size is smaller than size at last commit. This is occurring on the skip output file.

Debug information I execute failure test 3 in debug. After the restart, in the FlatFileItemWriter for the skip file it thinks 8 records were written when there should only have been 2.

I have tried implementing a SkipListener, extending SkipListenerSupport, using the annotations, implementing ItemStream, all with the same result. Yes, I can just write it using a logger (log4j for example), but I am trying to figure out what I am doing wrong implementing a FlatFileItemWriter.

Like I stated earlier, this is about as simple as you can get but below are the significant parts of the code Job.xml

    <batch:step id="step1">
        <batch:tasklet allow-start-if-complete="false">
            <batch:chunk reader="fileReader" processor="fileProcessor"
                writer="fileWriter" commit-interval="5" skip-limit="10">
                <batch:skippable-exception-classes>
                    <batch:include
                        class="com.myexception.SkipObjectException" />
                </batch:skippable-exception-classes>
                <batch:streams>
                    <batch:stream ref="skipListenerWriter" />
                </batch:streams>
            </batch:chunk>
            <batch:listeners>
                <batch:listener ref="customSkipListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
</batch:job>

<bean id="fileReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="file:TestFiles/input/input.txt" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean
                    class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value=";" />
                </bean>
            </property>
            <property name="fieldSetMapper" ref="fileMapper" />
        </bean>
    </property>
</bean>

<bean id="fileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" value="file:TestFiles/output/output.txt" />
    <property name="shouldDeleteIfExists" value="true" />
    <property name="lineAggregator">
        <bean
            class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
    </property>
</bean>

<bean id="skipListenerWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" value="file:TestFiles/output/skipListener.txt" />
    <property name="shouldDeleteIfExists" value="true" />
    <property name="lineAggregator">
        <bean
            class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
    </property>
</bean>

Processor - does nothing but throw SkipObjectException if the item being processed has an id ending in a.

CustomSkipListener

@Autowired
private FlatFileItemWriter<String> skipListenerWriter;

@OnSkipInProcess
public void processLog(FileObject theItem, Throwable theException) {
    String myMessage = "Process skipped item: " + theException.getMessage()
            + " item: " + theItem.getNumber();

    List<String> theItems = new ArrayList<String>();
    theItems.add(myMessage);
    logger.fatal(myMessage);
    try {
        this.skipListenerWriter.write(theItems);
    } catch (Exception e) {
        // TODO wouldn't actually do this...
        throw new RuntimeException(e);
    }
}

Upvotes: 2

Views: 2786

Answers (1)

user3813035
user3813035

Reputation: 141

My solution, right or wrong, was to uniquely name the writers using the name property from the ItemStreamSupport abstract class which the FlatFileItemWriter extends.

Spring Batch stores the amount written to a file in the execution context under the name.written key. The default name of the FlatFileItemWriter is FlatFileItemWriter. Since I had two FlatFileItemWriters there was only one item written to the execution context (FlatFileItemWriter.written). Since one file had more written to it than the other corruption occurred on a restart. Uniquely naming the writers resolved this. If there is a more appropriate way to handle this I would appreciate hearing it.

Upvotes: 3

Related Questions