PAA
PAA

Reputation: 11985

Spring Batch ItemWriteListener and SkipListener doesn't work hand in hand

I am making a POC around Spring Batch SkipListener and ItemWriteListener because my business demands it and I need that way only. In Spring Batch ItemWriteListener and SkipListener doesn't work hand in hand it seems to me.

Here is some POC code I developed, but @BeforeWrite doesn't save anything into DB.

MySkipListener.java

public class MySkipListener implements SkipListener<Integer, Integer> {
    
    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("@@@MySkipListener| On Skip in Read Error : " + t.getMessage());
    }

    @Override
    public void onSkipInWrite(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in write due to : " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in process due to: " + t.getMessage());
    }
}

MyStepListener.java

public class MyStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("beforeStep");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("afterStep");
        return ExitStatus.COMPLETED;
    }
}

MyItemWriteListener.java

public class MyItemWriteListener {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @BeforeWrite
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void beforeWrite(List<? extends Integer> items) {
        System.out.println("########### ItemWriteListener | beforeWrite " + items);
        
        // MUST FOR ME to save data into DB here !!!
        jdbcTemplate.update("INSERT INTO `test`.`mytest`(`name`) VALUES( 'A')");
    }
}

MyJob.java

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                if (item.equals(1)) {
                    throw new Exception("No 1 here!");
                }
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Integer, Integer>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(10)
                .listener(mySkipListener())
                .listener(myStepListener())
                .listener(myItemWriteListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }
    
    @Bean
    public MySkipListener mySkipListener() {
        return new MySkipListener();
    }
    
    @Bean
    public MyStepListener myStepListener() {
        return new MyStepListener();
    }
    
    @Bean
    public MyItemWriteListener myItemWriteListener() {
        return new MyItemWriteListener();
    }
}

Upvotes: 2

Views: 5995

Answers (2)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

POC code I developed, but @BeforeWrite doesn't save anything into DB. Any quick help?

The reason for that is explained in the javadoc of ItemWriteListener:

in fault tolerant scenarios, any transactional work that is done in one of these methods
would be rolled back and not re-applied. Because of this, it is recommended to not perform
any logic using this listener that participates in a transaction.

All methods from ItemWriterListener and SkipListener are executed in the scope of the transaction driven by Spring Batch. ChunkListener is what you are looking for (I guess). ChunkListener#beforeChunk is called inside the transaction whereas ChunkListener#afterChunk and ChunkListener#afterChunkError are called outside the transaction. This is to allow compensating actions to take place in their own transaction if needed. In this case, it is up to you manage the lifecycle of this separate transaction. Here is an excerpt from the javadoc:

data access code within this callback will still "participate" in the original transaction
**unless it declares that it runs in its own transaction**.
Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from here.

Upvotes: 2

Ken Chan
Ken Chan

Reputation: 90457

It is because when you use @BeforeWrite to declare a listener method , it will create a spring AOP proxy behind the scene that wraps this method, but this proxy is not a spring bean so it does not know how to react to @Transactional and hence it does not have effect.

Try to implement it with ItemWriteListener :

@Component
public class MyItemWriteListener implements ItemWriteListener<Integer> {
    
    void beforeWrite(List<Integer> items){


    }
}

On the other hands, I will not control the transaction related stuff for the items that require batch processing (i.e object read by ItemReader) by my own because spring-batch already helps to manage the transaction for them . It already started a new transaction whenever processing a new chunk of item. Managing it by yourself feels like will mess things up related to the rollback things when it retries or skips a failure item.

Upvotes: 0

Related Questions