Sunil M
Sunil M

Reputation: 95

Spring Batch :Aggregated reader / writer Issue

I am trying to use Spring batch and implement an aggregated reader (batch file, where multiple records should be treated as one record while writing). Here is the code snippet for my reader:

public class AggregatePeekableReader implements ItemReader<List<T>>, ItemStream {



    private SingleItemPeekableItemReader<T> reader;



    private boolean process(T currentRecord , InvoiceLineItemsHolder holder) throws UnexpectedInputException, ParseException, Exception {

        next = peekNextInvoiceRecord();

        // finish processing if we hit the end of file
        if (currentRecord == null ) {
                LOG.info("Exhausted ItemReader ( END OF FILE)");
                holder.exhausted = true;
                return false;
        }

        if ( currentRecord.hasSameInvoiceNumberAndVendorNumber(next)){
                LOG.info("Found new line item to current invocie record");
                holder.records.add(currentRecord);
                currentRecord = null;
                return true;
        }else{ 

            holder.records.add(currentRecord);
                return false;           
        }

}

    private T getNextInvoiceRecord () {

        T record=null;

        try {
            record=reader.read();
        } catch (UnexpectedInputException e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);
            throw e;
        } catch (ParseException e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);
            throw e;
        } catch (Exception e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);


        }

        return record;
    }

    private T peekNextInvoiceRecord() {

        T next=null;

        try {
            next=reader.peek();
        } catch (UnexpectedInputException e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);
            throw e;
        } catch (ParseException e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);
            throw e;
        } catch (Exception e) {
            ALERT.error(LogMessageFormatter.format(Severity.HIGH,
                    BATCH_FILE_READ_EXCEPTION, e), e);
        }
        return next;
    }

    public   void close () {
        reader.close();
    }

    public SingleItemPeekableItemReader<T> getReader() {
        return reader;
    }


    public   void setReader(SingleItemPeekableItemReader<T> reader) {
        this.reader = reader;
    }

    private class InvoiceLineItemsHolder {
        List<T> records = new ArrayList<T>();

        boolean exhausted = false;
}


    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        // 
        reader.open(executionContext);

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        // TODO 

    }

    @Override
    public List<T> read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException {
        CLASS holder = new SOMECLASS()

        synchronized (this) {

           while (process(getNextInvoiceRecord(), holder)) {
                continue;
            }
            if (!holder.exhausted) {

                return holder.records;
            } else {
                //When you hit the end of the file,close the reader.
                close();
                return null;
            }

        }

    }

}

The above is a working example for implementing a peekable reader.This peeks the next line (doesnt read it) and determines whether a logical end of line is reached (some times multiple lines can make up a single transaction)

Upvotes: 4

Views: 7484

Answers (2)

dma_k
dma_k

Reputation: 10639

You need to implement ItemStream interface for reader. This will give a hint to Spring Batch, that your reader requires some actions to open/close a stream:

public class InvoiceLineItemAggregatePeekableReader extends AbstractItemStreamItemReader<List<SAPInvoicePaymentRecord>> {

    @Override
    public void close() {
    ...
    }
}

Streams are closed whatever error occurred during step execution. For more examples check classes from Spring Batch itself (e.g. FlatFileItemReader).

Upvotes: 1

Michael Pralow
Michael Pralow

Reputation: 6630

I can't move the input file to to an Error folder because the Reader is not closed

you could copy the file and either use File.deleteOnExit() on the old file for later deletion or delete the old file in an extra step, e.g. with a simple tasklet and a flow which calls the deleteTaskletStep only if the business step had an exception

Upvotes: 0

Related Questions