Reputation: 133
I need helps.
I want to create a batch application which the scenario is
writeCount
of the first step.My approach is that I create a stepExecutionListener
to put the jobId
context to JobExecutionContext
.
So, In ItemReader
of second step, I can read from the database. But I don't know how to get the jobExecutionId
so that I can query the Mysql to get the right record.
Here is the code
public class WriteDataCtlFile {
private static final Logger log = LoggerFactory.getLogger(WriteDataCtlFile.class);
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step writeCtlFile(ItemReader<JobContext> ctlReader,
ItemProcessor<JobContext, CtlFile> ctlProcessor,
ItemWriter<CtlFile> ctlWriter){
return stepBuilderFactory.get("writeCtlFile")
.<JobContext, CtlFile>chunk(100)
.reader(ctlReader)
.processor(ctlProcessor)
.writer(ctlWriter)
.build();
}
@JobScope
@Bean
public ItemReader<JobContext> ctlReader(DataSource dataSource, JobContextMapper jobContextMapper) {
JdbcCursorItemReader<JobContext> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT short_context FROM BATCH_JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID = ?");
// THIS IS WHERE I WANT TO GET jobId
reader.setPreparedStatementSetter(new JobIdPrepareStatement(jobId));
reader.setRowMapper(jobContextMapper);
return reader;
}
@Bean
public ItemProcessor<JobContext, CtlFile> ctlProcessor(){
return new ItemProcessor<JobContext, CtlFile>() {
@Override
public CtlFile process(JobContext jobContext) throws Exception {
return new CtlFile(jobContext.getShort_context());
}
};
}
@Bean
public FlatFileItemWriter<CtlFile> ctlWriter(){
FlatFileItemWriter<CtlFile> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource("C:\\Users\\wathanyu.phromma\\data-output.ctl"));
flatFileItemWriter.setLineAggregator(new LineAggregator<CtlFile>() {
@Override
public String aggregate(CtlFile ctlFile) {
Gson gson = new Gson();
Map<String, Object> map = gson.fromJson(ctlFile.getWrittenRecordsCount(), Map.class);
return String.valueOf(map.get("writeCount"));
}
});
return flatFileItemWriter;
}
}
public class WriteDataTxtFile {
private static final Logger log = LoggerFactory.getLogger(WriteDataTxtFile.class);
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step writeTxtFile(
ItemReader<Account> reader,
ItemProcessor<Account, Account> processor,
ItemWriter<Account> writer){
return stepBuilderFactory.get("writeTxtFile")
.<Account, Account>chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(new WriteDataTxtStepListener())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<Account> reader(DataSource dataSource, AccountMapper accountMapper) {
log.info("test");
JdbcCursorItemReader<Account> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * FROM account");
reader.setRowMapper(accountMapper);
return reader;
}
@Bean
public ItemProcessor<Account, Account> processor(){
return new ItemProcessor<Account, Account>() {
@Override
public Account process(Account account) throws Exception {
return account;
}
};
}
@Bean
public FlatFileItemWriter<Account> writer(){
FlatFileItemWriter<Account> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource("C:\\Users\\wathanyu.phromma\\data-output.txt"));
flatFileItemWriter.setLineAggregator(new DelimitedLineAggregator<Account>(){{
setDelimiter("|");
setFieldExtractor(new BeanWrapperFieldExtractor<Account>(){{
setNames(new String[]{ "id", "accountId", "accountName","createdAt", "updatedAt"});
}});
}});
return flatFileItemWriter;
}
public class WriteDataTxtStepListener implements StepExecutionListener {
private static final Logger log = LoggerFactory.getLogger(WriteDataTxtStepListener.class);
@Override
public void beforeStep(StepExecution stepExecution) {
Date date = new Date();
String currentDate = new SimpleDateFormat("YYYY-mm-dd").format(date);
stepExecution.getJobExecution().getExecutionContext().put("jobId", stepExecution.getJobExecutionId());
stepExecution.getJobExecution().getExecutionContext().put("date", currentDate);
log.info("JobId = " + stepExecution.getJobExecutionId());
log.info("Before Step Count = " + stepExecution.getWriteCount());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
stepExecution.getJobExecution().getExecutionContext().put("writeCount", stepExecution.getWriteCount());
log.info("After Step Count = " + stepExecution.getWriteCount());
log.info("ExitStatus = " + stepExecution.getExitStatus().getExitCode());
return stepExecution.getExitStatus();
}
}
public class WriteDataToFlatFile {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public Job readFromApi(Step writeTxtFile, Step writeCtlFile){
return jobBuilderFactory.get("readFromApiToFlatFile")
.incrementer(new RunIdIncrementer())
.start(writeTxtFile)
.next(writeCtlFile)
.build();
}
@Bean
public DataSource dataSource(){
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/xxxx?useSSL=false");
dataSource.setUsername("xxxx");
dataSource.setPassword("xxxx");
return dataSource;
}
}
Upvotes: 2
Views: 5767
Reputation: 31590
To get data from the job execution context in the reader of your second step, you can inject the value as a parameter in your bean definition method like this:
@JobScope
@Bean
public ItemReader<JobContext> ctlReader(DataSource dataSource, JobContextMapper jobContextMapper, @Value("#{jobExecutionContext['jobId']}") int jobId) {
// use jobId
}
Hope this helps.
Upvotes: 3