Reputation: 1119
I have setup a simple read job in spring batch using java config and I am trying to write a simple listener. The listener should display the amount of time it took in seconds for a certain amount of records to be read.
The bean looks as following:
@Bean
public SimpleItemReaderListener listener(){
SimpleItemReaderListener listener = new SimpleItemReaderListener<>();
listener.setLogInterval(50000);
return listener;
}
Based on the set log interval, a message will be displayed and the message will look like this:
14:42:11,445 INFO main SimpleItemReaderListener:45 - Read records [0] to [50.000] in average 1,30 seconds
14:42:14,453 INFO main SimpleItemReaderListener:45 - Read records [50.000] to [100.000] in average 2,47 seconds
14:42:15,489 INFO main SimpleItemReaderListener:45 - Read records [100.000] to [150.000] in average 1,03 seconds
14:42:16,448 INFO main SimpleItemReaderListener:45 - Read records [150.000] to [200.000] in average 0,44 seconds
Exactly how I want it, perfect. However, when I change the chunk in my batchConfiguration from 100.000 to let's say 1.000, the logging changes and I don't know what is causing the change...
14:51:24,893 INFO main SimpleItemReaderListener:45 - Read records [0] to [50.000] in average 0,90 seconds
14:51:50,657 INFO main SimpleItemReaderListener:45 - Read records [50.000] to [100.000] in average 0,57 seconds
14:52:16,392 INFO main SimpleItemReaderListener:45 - Read records [100.000] to [150.000] in average 0,59 seconds
14:52:42,125 INFO main SimpleItemReaderListener:45 - Read records [150.000] to [200.000] in average 0,61 seconds
Being under the impression that the beforeRead and afterRead methods in the ItemReaderListener will be executed for each individual item, I was expecting the time it took for each 50.000 to be more in line with the time shown from the slf4j log (e.g. around 26 seconds for each 50.000).
What part of my listener is causing this unwanted behaviour when I change the chunk size?
My implementation of the ItemReadListener is as following:
public class SimpleItemReaderListener<Item> implements ItemReadListener<Item>{
private static final Logger LOG = LoggerFactory.getLogger(SimpleItemReaderListener.class);
private static final double NANO_TO_SECOND_DIVIDER_NUMBER = 1_000_000_000.0;
private static final String PATTERN = ",###";
private int startCount = 0;
private int logInterval = 50000;
private int currentCount;
private int totalCount;
private long timeElapsed;
private long startTime;
private DecimalFormat decimalFormat = new DecimalFormat(PATTERN);
@Override
public void beforeRead() {
startTime = System.nanoTime();
}
@Override
public void afterRead(Item item) {
updateTimeElapsed();
if (currentCount == logInterval) {
displayMessage();
updateStartCount();
resetCount();
} else {
increaseCount();
}
}
private void updateTimeElapsed() {
timeElapsed += System.nanoTime() - startTime;
}
private void displayMessage() {
LOG.info(String.format("Read records [%s] to [%s] in average %.2f seconds",
decimalFormat.format(startCount),
decimalFormat.format(totalCount),
timeElapsed / NANO_TO_SECOND_DIVIDER_NUMBER));
}
private void updateStartCount() {
startCount += currentCount;
}
private void resetCount() {
currentCount = 0;
timeElapsed = 0;
}
private void increaseCount() {
currentCount++;
totalCount++;
}
@Override
public void onReadError(Exception arg0) {
// NO-OP
}
public void setLogInterval(int logInterval){
this.logInterval = logInterval;
}
}
The full batchconfiguration class:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.flow(validateInput())
.end()
.build();
}
@Bean
public Step validateInput() {
return stepBuilderFactory.get("validateInput")
.chunk(1000)
.reader(reader())
.listener(listener())
.writer(writer())
.build();
}
@Bean
public HeaderTokenizer tokenizeHeader(){
HeaderTokenizer tokenizer = new HeaderTokenizer();
//optional setting, custom delimiter is set to ','
//tokenizer.setDelimiter(",");
return tokenizer;
}
@Bean
public SimpleItemReaderListener listener(){
SimpleItemReaderListener listener = new SimpleItemReaderListener<>();
//optional setting, custom logging is set to 1000, increase for less verbose logging
listener.setLogInterval(50000);
return listener;
}
@Bean
public FlatFileItemReader reader() {
FlatFileItemReader reader = new FlatFileItemReader();
reader.setLinesToSkip(1);
reader.setSkippedLinesCallback(tokenizeHeader());
reader.setResource(new ClassPathResource("majestic_million.csv"));
reader.setLineMapper(new DefaultLineMapper() {{
setLineTokenizer(tokenizeHeader());
setFieldSetMapper(new PassThroughFieldSetMapper());
}});
return reader;
}
@Bean
public DummyItemWriter writer(){
DummyItemWriter writer = new DummyItemWriter();
return writer;
}
}
Or use the spring boot example from http://projects.spring.io/spring-batch/ and add the SimpleItemReaderListener bean.
Upvotes: 0
Views: 1587
Reputation: 3583
Your application is spending more time outside the reader when the batch size is small. Your timing code only measures time spent in the reader but the logging framework shows timestamps which is time spent total.
Upvotes: 1