Julien Bourdon
Julien Bourdon

Reputation: 1723

Does Spring Batch have a feature to keep track of processed rows?

I currently use Spring Batch to update my entities through an external application which writes directly in my database through an ODBC connection.

For the sake of simplicity, I will represent a simplified version of the table schema here (referred as `importshipmentdata in the Java code below):

id(integer)
entity1_data1(character varying)
entity1_data2(character varying)
entity2_data2(character varying)
import_date(date_created timestamp with time zone)

And here is my Job Configuration :

@Configuration
@EnableBatchProcessing
public class ImportShippingConfig {

    @Inject
    private JobBuilderFactory jobs;

    @Inject
    private StepBuilderFactory steps;

    @Inject
    private JobRepository jobRepository;

    @Inject
    private DataSource dataSource;


    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher;
        jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return jobLauncher;
    }

    @Bean
    public ItemReader<ImportShippingItem> reader() {
        JdbcCursorItemReader<ImportShippingItem> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        BeanPropertyRowMapper<ImportShippingItem> mapper = new BeanPropertyRowMapper<>(ImportShippingItem.class);
        reader.setSql("SELECT * FROM importshipmentdata");
        reader.setRowMapper(mapper);
        return reader;
    }

    @Bean
    public ItemProcessor<ImportShippingItem, ImportShippingItem> processor() {
        return new ImportShippingItemProcessor();
    }

    @Bean
    public ItemWriter<ImportShippingItem> writer() {
        return new ImportShippingItemWriter();
    }

    @Bean
    public Job ShippingImporter() {
        return jobs.get("ShippingImporter").start(importShipping()).build();
    }

    @Bean
    public Step importShipping() {
        return steps.get("importShipping")
            .<ImportShippingItem, ImportShippingItem>chunk(5)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
    }

}

(Do not get offended by the SELECT *, I am still in the POC phase :) )

I could use the import_date filed but then again, I am afraid it would not handle job failures very well.

Upvotes: 0

Views: 2472

Answers (2)

Akhachkhouch
Akhachkhouch

Reputation: 338

You can use JobRepository to get status and jobId, this repo offers all data on spring batch

Upvotes: 0

Julien Bourdon
Julien Bourdon

Reputation: 1723

I found the answer while redacting the question so I will post it here.

As stated in the documentation

By default, all of the ItemReader and ItemWriter implementations store their current state in the ExecutionContext before it is committed. However, this may not always be the desired behavior. For example, many developers choose to make their database readers 'rerunnable' by using a process indicator. An extra column is added to the input data to indicate whether or not it has been processed. When a particular record is being read (or written out) the processed flag is flipped from false to true. The SQL statement can then contain an extra statement in the where clause, such as "where PROCESSED_IND = false", thereby ensuring that only unprocessed records will be returned in the case of a restart. In this scenario, it is preferable to not store any state, such as the current row number, since it will be irrelevant upon restart. For this reason, all readers and writers include the 'saveState' property:

So I will just add a processed column to my table and change my query as SELECT entity1_data1, entity1_data2, entity2_data1 FROM table WHERE processed = false.

Then I will my writer change the value of the column to true when the item is written and set it to saveState=false

Upvotes: 2

Related Questions