Reputation: 540
I am using spring batch to read records from a relational db and writing to a no-sql db using Chunk. I want to log total number of records processed after the job is finished. I could not find any relevant listener who could help me achieve the task. There are item based and step based listeners but nothing gives me the total number of processing records. Any help would be appreciated.
@Bean
public Step step1(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("Stepname", jobRepository)
.allowStartIfComplete(true)
.<OriginEntity, DestinationEntity>chunk(chunkSize, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skip(ProcessorException.class)
.skip(WriterException.class)
.skipLimit(skipLimit)
.listener(itemReaderExecutionListener)
.listener(itemProcessorExecutionListener)
.listener(itemWriterExecutionListener)
.build();
}
Upvotes: 0
Views: 254
Reputation: 31710
You did not define what "processed records" means (ie including filtered records or not, including skipped records or not, etc), but the total can be calculated with the counters provided by spring batch in the StepExecution
(writeCount
, processSkipCount
, filterCount
, etc). For example, if you want to include skipped and filtered items during the processing phase, the total would be:
processedCount = writeCount + processSkipCount + filterCount;
The writeCount
represents the number of items correctly processed (ie passed to the writer and correctly written).
Otherwise, you can write a custom ItemProcessListener
that does the calculations, here is a quick example:
class ItemCountingListener<I, O> implements ItemProcessListener<I, O>, StepExecutionListener {
private long processCount;
private long filterCount;
private long errorCount;
private StepExecution stepExecution;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public void afterProcess(I item, O result) {
if (result == null) {
filterCount++;
stepExecution.getExecutionContext().put("filter.count", filterCount);
} else {
processCount++;
stepExecution.getExecutionContext().put("process.count", processCount);
}
}
@Override
public void onProcessError(I item, Exception e) {
errorCount++;
stepExecution.getExecutionContext().put("error.count", errorCount);
}
}
Upvotes: 0