Reputation: 804
I'm using Spring 4.0.5 and Spring Batch 3.0.1
I have a simple step like this, and it works perfectly:
<step id="myStep" next="nextStep">
<tasklet transaction-manager="myTxManager" task-executor="myTaskExecutor" throttle-limit="10">
<batch:chunk reader="myItemReader" processor="myPDFItemProcessor" writer="myItemWriter" commit-interval="20">
</batch:chunk>
</tasklet>
</step>
I have tried a simple skip-limit example (Configuring Skip Logic) like this:
<step id="myStep" next="nextStep">
<tasklet transaction-manager="myTxManager" task-executor="myTaskExecutor" throttle-limit="10">
<batch:chunk reader="myItemReader" processor="myPDFItemProcessor" writer="myItemWriter" commit-interval="20" skip-limit="10000000">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception" />
</batch:skippable-exception-classes>
</batch:chunk>
</tasklet>
</step>
When I try to add this logic, this warning is writen in log files:
2015-03-24 16:03:50 [WARN ] [org.springframework.batch.core.step.builder.FaultTolerantStepBuilder.detectStreamInReader(FaultTolerantStepBuilder.java:504)] Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, and may lead to incorrect restart data being stored.
2015-03-24 16:04:18 [WARN ] [org.springframework.batch.core.step.item.ChunkMonitor.open(ChunkMonitor.java:118)] No ItemReader set (must be concurrent step), so ignoring offset data.
2015-03-24 16:04:18 [WARN ] [org.springframework.batch.core.step.item.ChunkMonitor.getData(ChunkMonitor.java:155)] ItemStream was opened in a different thread. Restart data could be compromised.
The reader is JdbcPagingItemReader with saveState setted to false.
The processor is a CompositeItemProcessor.
The writer is a CompositeItemWriter.
Is there anything wrong with my configuration? Maybe do I need any additional configuration for skip logic to work?
Any help is very appreciated. Thanks
Upvotes: 2
Views: 3483
Reputation: 804
Finally, I could solve my issue with Samwise's hint.
When you have an ItemReader in a multithreaded step, to make that reader thread safe, the code is:
public SynchronizedItemReader<T> implements ItemReader<T> {
private final ItemReader<T> delegate;
public SynchronizedItemReader(ItemReader<T> delegate) {
this.delegate = delegate;
}
public synchronized T read () {
return delegate.read();
}
}
The delegated ItemReader in this class is the current ItemReader you want to make thread-safe.
This sample class is in the link suggested by Samwise.
IMPORTANT EDIT: Docs in Item Readers and Writers, section 6.5, explain that delegated reader must be injected as a stream. I don't know why, but in my current issue in this question, the delegate reader must not be injected as a stream reader in the chunk. When I injected it, it failed.
Hope it would be helpful. Thanks.
Upvotes: 4