W.Phromma
W.Phromma

Reputation: 133

How to get the JobExecutionId in ItemReader in Spring Batch 4

I need helps.

I want to create a batch application which the scenario is

  1. Read data from database and then write it to text file. [ Consider this as the first step]
  2. When the first step is done, the second step is writing a ctl file which contains the writeCount of the first step.

My approach is that I create a stepExecutionListener to put the jobId context to JobExecutionContext.
So, In ItemReader of second step, I can read from the database. But I don't know how to get the jobExecutionId so that I can query the Mysql to get the right record.

Here is the code

public class WriteDataCtlFile {


private static final Logger log = LoggerFactory.getLogger(WriteDataCtlFile.class);

@Autowired
private StepBuilderFactory stepBuilderFactory;


@Bean
public Step writeCtlFile(ItemReader<JobContext> ctlReader,
                         ItemProcessor<JobContext, CtlFile> ctlProcessor,
                         ItemWriter<CtlFile> ctlWriter){
    return stepBuilderFactory.get("writeCtlFile")
            .<JobContext, CtlFile>chunk(100)
            .reader(ctlReader)
            .processor(ctlProcessor)
            .writer(ctlWriter)
            .build();
}

@JobScope
@Bean
public ItemReader<JobContext> ctlReader(DataSource dataSource, JobContextMapper jobContextMapper) {
    JdbcCursorItemReader<JobContext> reader = new JdbcCursorItemReader<>();
    reader.setDataSource(dataSource);
    reader.setSql("SELECT short_context FROM BATCH_JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID = ?");
    // THIS IS WHERE I WANT TO GET jobId
    reader.setPreparedStatementSetter(new JobIdPrepareStatement(jobId));
    reader.setRowMapper(jobContextMapper);
    return reader;
}


@Bean
public ItemProcessor<JobContext, CtlFile> ctlProcessor(){
    return new ItemProcessor<JobContext, CtlFile>() {
        @Override
        public CtlFile process(JobContext jobContext) throws Exception {
            return new CtlFile(jobContext.getShort_context());
        }
    };
}


@Bean
public FlatFileItemWriter<CtlFile> ctlWriter(){
    FlatFileItemWriter<CtlFile> flatFileItemWriter = new FlatFileItemWriter<>();
    flatFileItemWriter.setResource(new FileSystemResource("C:\\Users\\wathanyu.phromma\\data-output.ctl"));
    flatFileItemWriter.setLineAggregator(new LineAggregator<CtlFile>() {
        @Override
        public String aggregate(CtlFile ctlFile) {
            Gson gson = new Gson();
            Map<String, Object> map = gson.fromJson(ctlFile.getWrittenRecordsCount(), Map.class);
            return String.valueOf(map.get("writeCount"));
        }
    });
    return flatFileItemWriter;
}

}

public class WriteDataTxtFile {

private static final Logger log = LoggerFactory.getLogger(WriteDataTxtFile.class);



@Autowired
private StepBuilderFactory stepBuilderFactory;


@Bean
public Step writeTxtFile(
        ItemReader<Account> reader,
        ItemProcessor<Account, Account> processor,
        ItemWriter<Account> writer){
    return stepBuilderFactory.get("writeTxtFile")
            .<Account, Account>chunk(2)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .listener(new WriteDataTxtStepListener())
            .build();
}


@Bean
@StepScope
public JdbcCursorItemReader<Account> reader(DataSource dataSource, AccountMapper accountMapper) {
    log.info("test");
    JdbcCursorItemReader<Account> reader = new JdbcCursorItemReader<>();
    reader.setDataSource(dataSource);
    reader.setSql("SELECT * FROM account");
    reader.setRowMapper(accountMapper);
    return reader;
}



@Bean
public ItemProcessor<Account, Account> processor(){
    return new ItemProcessor<Account, Account>() {
        @Override
        public Account process(Account account) throws Exception {
            return account;
        }
    };
}

@Bean
public FlatFileItemWriter<Account> writer(){
    FlatFileItemWriter<Account> flatFileItemWriter = new FlatFileItemWriter<>();
    flatFileItemWriter.setResource(new FileSystemResource("C:\\Users\\wathanyu.phromma\\data-output.txt"));
    flatFileItemWriter.setLineAggregator(new DelimitedLineAggregator<Account>(){{
        setDelimiter("|");
        setFieldExtractor(new BeanWrapperFieldExtractor<Account>(){{
            setNames(new String[]{ "id", "accountId", "accountName","createdAt", "updatedAt"});
        }});
    }});
    return flatFileItemWriter;
}


public class WriteDataTxtStepListener implements StepExecutionListener {

private static final Logger log = LoggerFactory.getLogger(WriteDataTxtStepListener.class);


@Override
public void beforeStep(StepExecution stepExecution) {
    Date date = new Date();
    String currentDate = new SimpleDateFormat("YYYY-mm-dd").format(date);
    stepExecution.getJobExecution().getExecutionContext().put("jobId", stepExecution.getJobExecutionId());
    stepExecution.getJobExecution().getExecutionContext().put("date", currentDate);
    log.info("JobId = " + stepExecution.getJobExecutionId());
    log.info("Before Step Count = " + stepExecution.getWriteCount());
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
    stepExecution.getJobExecution().getExecutionContext().put("writeCount", stepExecution.getWriteCount());
    log.info("After Step Count = " + stepExecution.getWriteCount());
    log.info("ExitStatus = " + stepExecution.getExitStatus().getExitCode());
    return stepExecution.getExitStatus();
}

}

public class WriteDataToFlatFile {


@Autowired
private JobBuilderFactory jobBuilderFactory;


@Bean
public Job readFromApi(Step writeTxtFile, Step writeCtlFile){

    return jobBuilderFactory.get("readFromApiToFlatFile")
            .incrementer(new RunIdIncrementer())
            .start(writeTxtFile)
            .next(writeCtlFile)
            .build();
}


@Bean
public DataSource dataSource(){
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/xxxx?useSSL=false");
    dataSource.setUsername("xxxx");
    dataSource.setPassword("xxxx");
    return dataSource;
}

}

Upvotes: 2

Views: 5767

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31590

To get data from the job execution context in the reader of your second step, you can inject the value as a parameter in your bean definition method like this:

@JobScope
@Bean
public ItemReader<JobContext> ctlReader(DataSource dataSource, JobContextMapper jobContextMapper, @Value("#{jobExecutionContext['jobId']}") int jobId) {
   // use jobId
}

Hope this helps.

Upvotes: 3

Related Questions