Reputation: 11985
I am making a POC around Spring Batch SkipListener
and ItemWriteListener
because my business demands it and I need that way only. In Spring Batch ItemWriteListener
and SkipListener
doesn't work hand in hand it seems to me.
Here is some POC code I developed, but @BeforeWrite
doesn't save anything into DB.
MySkipListener.java
public class MySkipListener implements SkipListener<Integer, Integer> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("@@@MySkipListener| On Skip in Read Error : " + t.getMessage());
}
@Override
public void onSkipInWrite(Integer item, Throwable t) {
System.out.println("@@@MySkipListener | Skipped in write due to : " + t.getMessage());
}
@Override
public void onSkipInProcess(Integer item, Throwable t) {
System.out.println("@@@MySkipListener | Skipped in process due to: " + t.getMessage());
}
}
MyStepListener.java
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("afterStep");
return ExitStatus.COMPLETED;
}
}
MyItemWriteListener.java
public class MyItemWriteListener {
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeWrite
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void beforeWrite(List<? extends Integer> items) {
System.out.println("########### ItemWriteListener | beforeWrite " + items);
// MUST FOR ME to save data into DB here !!!
jdbcTemplate.update("INSERT INTO `test`.`mytest`(`name`) VALUES( 'A')");
}
}
MyJob.java
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
if (item.equals(1)) {
throw new Exception("No 1 here!");
}
System.out.println("item = " + item);
}
};
}
@Bean
public Step step() {
return steps.get("step")
.<Integer, Integer>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(mySkipListener())
.listener(myStepListener())
.listener(myItemWriteListener())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public MySkipListener mySkipListener() {
return new MySkipListener();
}
@Bean
public MyStepListener myStepListener() {
return new MyStepListener();
}
@Bean
public MyItemWriteListener myItemWriteListener() {
return new MyItemWriteListener();
}
}
Upvotes: 2
Views: 5995
Reputation: 31600
POC code I developed, but @BeforeWrite doesn't save anything into DB. Any quick help?
The reason for that is explained in the javadoc of ItemWriteListener:
in fault tolerant scenarios, any transactional work that is done in one of these methods
would be rolled back and not re-applied. Because of this, it is recommended to not perform
any logic using this listener that participates in a transaction.
All methods from ItemWriterListener
and SkipListener
are executed in the scope of the transaction driven by Spring Batch. ChunkListener
is what you are looking for (I guess). ChunkListener#beforeChunk
is called inside the transaction whereas ChunkListener#afterChunk
and ChunkListener#afterChunkError
are called outside the transaction. This is to allow compensating actions to take place in their own transaction if needed. In this case, it is up to you manage the lifecycle of this separate transaction. Here is an excerpt from the javadoc:
data access code within this callback will still "participate" in the original transaction
**unless it declares that it runs in its own transaction**.
Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from here.
Upvotes: 2
Reputation: 90457
It is because when you use @BeforeWrite
to declare a listener method , it will create a spring AOP proxy behind the scene that wraps this method, but this proxy is not a spring bean so it does not know how to react to @Transactional
and hence it does not have effect.
Try to implement it with ItemWriteListener
:
@Component
public class MyItemWriteListener implements ItemWriteListener<Integer> {
void beforeWrite(List<Integer> items){
}
}
On the other hands, I will not control the transaction related stuff for the items that require batch processing (i.e object read by ItemReader
) by my own because spring-batch
already helps to manage the transaction for them . It already started a new transaction whenever processing a new chunk of item. Managing it by yourself feels like will mess things up related to the rollback things when it retries or skips a failure item.
Upvotes: 0