Sukh
Sukh

Reputation: 540

Total number of processed records in all the chunks in spring batch

I am using spring batch to read records from a relational db and writing to a no-sql db using Chunk. I want to log total number of records processed after the job is finished. I could not find any relevant listener who could help me achieve the task. There are item based and step based listeners but nothing gives me the total number of processing records. Any help would be appreciated.

  @Bean
    public Step step1(JobRepository jobRepository,
                      PlatformTransactionManager transactionManager) {
        return new StepBuilder("Stepname", jobRepository)
                .allowStartIfComplete(true)
                .<OriginEntity, DestinationEntity>chunk(chunkSize, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
                .skip(ProcessorException.class)
                .skip(WriterException.class)
                .skipLimit(skipLimit)
                .listener(itemReaderExecutionListener)
                .listener(itemProcessorExecutionListener)
                .listener(itemWriterExecutionListener)
                .build();
    }

Upvotes: 0

Views: 254

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31710

You did not define what "processed records" means (ie including filtered records or not, including skipped records or not, etc), but the total can be calculated with the counters provided by spring batch in the StepExecution (writeCount, processSkipCount, filterCount, etc). For example, if you want to include skipped and filtered items during the processing phase, the total would be:

processedCount = writeCount + processSkipCount + filterCount;

The writeCount represents the number of items correctly processed (ie passed to the writer and correctly written).

Otherwise, you can write a custom ItemProcessListener that does the calculations, here is a quick example:

class ItemCountingListener<I, O> implements ItemProcessListener<I, O>, StepExecutionListener {
    
    private long processCount;
    private long filterCount;
    private long errorCount;

    private StepExecution stepExecution;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public void afterProcess(I item, O result) {
        if (result == null) {
            filterCount++;
            stepExecution.getExecutionContext().put("filter.count", filterCount);
        } else {
            processCount++;
            stepExecution.getExecutionContext().put("process.count", processCount);
        }

    }

    @Override
    public void onProcessError(I item, Exception e) {
        errorCount++;
        stepExecution.getExecutionContext().put("error.count", errorCount);
    }
}

Upvotes: 0

Related Questions