Nithin
Nithin

Reputation: 381

Spring Boot + Spring Batch Multiple Job Creation and Scheduling

I created a Spring Boot with Spring Batch Application and Scheduling. When i create only one job, things are working fine . But when i try to create another job using the modular approach, I am getting few errors like reader is already closed and some errors related to version even though i am using different readers. The jobs and it's step are running many times and they are getting duplicated.

Can anyone Please guide me how to resolve these issues and run the jobs in a parallel way independent of each other ?

Below are the configuration Classes : ModularJobConfiguration.java , DeptBatchConfiguration.java and CityBatchConfiguration.java and BatchScheduler.java

@Configuration
@EnableBatchProcessing(modular=true)
public class ModularJobConfiguration {

    @Bean
    public ApplicationContextFactory firstJob() {
        return new GenericApplicationContextFactory(DeptBatchConfiguration.class);
    }

    @Bean
    public ApplicationContextFactory secondJob() {
        return new GenericApplicationContextFactory(CityBatchConfiguration.class);
    }

}


@Configuration
@EnableBatchProcessing
@Import({BatchScheduler.class})
public class DeptBatchConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(DeptBatchConfiguration.class);

    @Autowired
    private SimpleJobLauncher jobLauncher;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobExecutionListener listener;

    public ItemReader<DepartmentModelReader> deptReaderSO;


    @Autowired
    @Qualifier("dataSourceReader")
    private DataSource dataSourceReader;


    @Autowired
    @Qualifier("dataSourceWriter")
    private DataSource dataSourceWriter;



    @Scheduled(cron = "0 0/1 * * * ?")
    public void performFirstJob() throws Exception {

        long startTime = System.currentTimeMillis();
        LOGGER.info("Job1 Started at :" + new Date());
        JobParameters param = new JobParametersBuilder().addString("JobID1",String.valueOf(System.currentTimeMillis())).toJobParameters();

        JobExecution execution = (JobExecution) jobLauncher.run(importDeptJob(jobBuilderFactory,stepdept(deptReaderSO,customWriter()),listener), param);

        long endTime = System.currentTimeMillis();
        LOGGER.info("Job1 finished at " + (endTime - startTime) / 1000  + " seconds with status :" + execution.getExitStatus());
    }

    @Bean
    public ItemReader<DepartmentModelReader> deptReaderSO() {
        //LOGGER.info("Inside deptReaderSO Method");
        JdbcCursorItemReader<DepartmentModelReader> deptReaderSO = new JdbcCursorItemReader<>();
        //deptReaderSO.setSql("select id, firstName, lastname, random_num from reader");
        deptReaderSO.setSql("SELECT DEPT_CODE,DEPT_NAME,FULL_DEPT_NAME,CITY_CODE,CITY_NAME,CITY_TYPE_NAME,CREATED_USER_ID,CREATED_G_DATE,MODIFIED_USER_ID,MODIFIED_G_DATE,RECORD_ACTIVITY,DEPT_CLASS,DEPT_PARENT,DEPT_PARENT_NAME FROM TBL_SAMPLE_SAFTY_DEPTS");
        deptReaderSO.setDataSource(dataSourceReader);
        deptReaderSO.setRowMapper(
                (ResultSet resultSet, int rowNum) -> {
                    if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
                        DepartmentModelReader recordSO = new DepartmentModelReader();
                        recordSO.setDeptCode(resultSet.getString("DEPT_CODE"));
                        recordSO.setDeptName(resultSet.getString("DEPT_NAME"));
                        recordSO.setFullDeptName(resultSet.getString("FULL_DEPT_NAME"));
                        recordSO.setCityCode(resultSet.getInt("CITY_CODE"));
                        recordSO.setCityName(resultSet.getString("CITY_NAME"));
                        recordSO.setCityTypeName(resultSet.getString("CITY_TYPE_NAME"));
                        recordSO.setCreatedUserId(resultSet.getInt("CREATED_USER_ID"));
                        recordSO.setCreatedGDate(resultSet.getDate("CREATED_G_DATE"));
                        recordSO.setModifiedUserId(resultSet.getString("MODIFIED_USER_ID"));
                        recordSO.setModifiedGDate(resultSet.getDate("MODIFIED_G_DATE"));
                        recordSO.setRecordActivity(resultSet.getInt("RECORD_ACTIVITY"));
                        recordSO.setDeptClass(resultSet.getInt("DEPT_CLASS"));
                        recordSO.setDeptParent(resultSet.getString("DEPT_PARENT"));
                        recordSO.setDeptParentName(resultSet.getString("DEPT_PARENT_NAME"));

                       // LOGGER.info("RowMapper record : {}", recordSO.getDeptCode() +" | "+recordSO.getDeptName());
                        return recordSO;
                    } else {
                        LOGGER.info("Returning null from rowMapper");
                        return null;
                    }
                });
        return deptReaderSO;
    }

    @Bean
    public ItemProcessor<DepartmentModelReader, DepartmentModelWriter> processor() {
        //LOGGER.info("Inside Processor Method");
        return new RecordProcessor();
    }

    @Bean
    public ItemWriter<DepartmentModelWriter> customWriter(){
        //LOGGER.info("Inside customWriter Method");
        return new CustomItemWriter();
    }

    @Bean
    public Job importDeptJob(JobBuilderFactory jobs, Step stepdept,JobExecutionListener listener){
        return jobs.get("importDeptJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(stepdept).end().build();
    }

    @Bean
    public Step stepdept(ItemReader<DepartmentModelReader> deptReaderSO,
            ItemWriter<DepartmentModelWriter> writerSO) {
        LOGGER.info("Inside stepdept Method");

        return stepBuilderFactory.get("stepdept").<DepartmentModelReader, DepartmentModelWriter>chunk(5)
                .reader(deptReaderSO).processor(processor()).writer(customWriter()).transactionManager(platformTransactionManager(dataSourceWriter)).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionNotificationListener();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public BatchWriteService batchWriteService() {
        return new BatchWriteService();
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager(@Qualifier("dataSourceWriter") DataSource dataSourceWriter) {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSourceWriter);
        return transactionManager;
    }
}



@Configuration
@EnableBatchProcessing
@Import({BatchScheduler.class})
public class CityBatchConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(CityBatchConfiguration.class);

    @Autowired
    private SimpleJobLauncher jobLauncher;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobExecutionListener listener;

    public ItemReader<CitiesModelReader> citiesReaderSO;

    @Autowired
    @Qualifier("dataSourceReader")
    private DataSource dataSourceReader;


    @Autowired
    @Qualifier("dataSourceWriter")
    private DataSource dataSourceWriter;


    @Scheduled(cron = "0 0/1 * * * ?")
    public void performSecondJob() throws Exception {

        long startTime = System.currentTimeMillis();
        LOGGER.info("\n Job2 Started at :" + new Date());

        JobParameters param = new JobParametersBuilder().addString("JobID2",String.valueOf(System.currentTimeMillis())).toJobParameters();

        JobExecution execution = (JobExecution) jobLauncher.run(importCitiesJob(jobBuilderFactory,stepcity(citiesReaderSO,customCitiesWriter()),listener), param);

        long endTime = System.currentTimeMillis();
        LOGGER.info("Job2 finished at " + (endTime - startTime) / 1000  + " seconds with status :" + execution.getExitStatus());
    }


    @Bean
    public ItemReader<CitiesModelReader> citiesReaderSO() {
        //LOGGER.info("Inside readerSO Method");
        JdbcCursorItemReader<CitiesModelReader> readerSO = new JdbcCursorItemReader<>();
        readerSO.setSql("SELECT CITY_CODE,CITY_NAME,PARENT_CITY,CITY_TYPE,CITY_TYPE_NAME,CREATED_G_DATE,CREATED_USER_ID,MODIFIED_G_DATE,MODIFIED_USER_ID,RECORD_ACTIVITY FROM TBL_SAMPLE_SAFTY_CITIES");
        readerSO.setDataSource(dataSourceReader);
        readerSO.setRowMapper(
                (ResultSet resultSet, int rowNum) -> {
                    if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
                        CitiesModelReader recordSO = new CitiesModelReader();
                        recordSO.setCityCode(resultSet.getLong("CITY_CODE"));
                        recordSO.setCityName(resultSet.getString("CITY_NAME"));
                        recordSO.setParentCity(resultSet.getInt("PARENT_CITY"));
                        recordSO.setCityType(resultSet.getString("CITY_TYPE"));
                        recordSO.setCityTypeName(resultSet.getString("CITY_TYPE_NAME"));
                        recordSO.setCreatedGDate(resultSet.getDate("CREATED_G_DATE"));
                        recordSO.setCreatedUserId(resultSet.getString("CREATED_USER_ID"));
                        recordSO.setModifiedGDate(resultSet.getDate("MODIFIED_G_DATE"));
                        recordSO.setModifiedUserId(resultSet.getString("MODIFIED_USER_ID"));
                        recordSO.setRecordActivity(resultSet.getInt("RECORD_ACTIVITY"));

                        //LOGGER.info("RowMapper record : {}", recordSO.toString());
                        return recordSO;
                    } else {
                        LOGGER.info("Returning null from rowMapper");
                        return null;
                    }
                });
        return readerSO;
    }


    @Bean
    public ItemProcessor<CitiesModelReader,CitiesModelWriter> citiesProcessor() {
        //LOGGER.info("Inside Processor Method");
        return new RecordCitiesProcessor();
    }


    @Bean
    public ItemWriter<CitiesModelWriter> customCitiesWriter(){
        LOGGER.info("Inside customCitiesWriter Method");
        return new CustomCitiesWriter();
    }           

    @Bean
    public Job importCitiesJob(JobBuilderFactory jobs, Step stepcity,JobExecutionListener listener) {

        LOGGER.info("Inside importCitiesJob Method");
        return jobs.get("importCitiesJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(stepcity).end().build();
    }


    @Bean
    public Step stepcity(ItemReader<CitiesModelReader> readerSO,
            ItemWriter<CitiesModelWriter> writerSO) {
        LOGGER.info("Inside stepCity Method");

        return stepBuilderFactory.get("stepcity").<CitiesModelReader, CitiesModelWriter>chunk(5)
                .reader(readerSO).processor(citiesProcessor()).writer(customCitiesWriter()).transactionManager(platformTransactionManager(dataSourceWriter)).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionNotificationListener();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public BatchWriteService batchWriteService() {
        return new BatchWriteService();
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager(@Qualifier("dataSourceWriter") DataSource dataSourceWriter) {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSourceWriter);
        return transactionManager;
    }
}





@Configuration
@EnableScheduling
public class BatchScheduler {

    private static final Logger LOGGER = LoggerFactory.getLogger(BatchScheduler.class);

    @Bean
    public ResourcelessTransactionManager resourcelessTransactionManager() {
        return new ResourcelessTransactionManager();
    }

    @Bean
    public MapJobRepositoryFactoryBean mapJobRepositoryFactory(
            ResourcelessTransactionManager txManager) throws Exception {

        LOGGER.info("Inside mapJobRepositoryFactory method");

        MapJobRepositoryFactoryBean factory = new 
                MapJobRepositoryFactoryBean(txManager);

        factory.afterPropertiesSet();

        return factory;
    }

    @Bean
    public JobRepository jobRepository(
            MapJobRepositoryFactoryBean factory) throws Exception {

        LOGGER.info("Inside jobRepository method");

        return factory.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {

        LOGGER.info("Inside jobLauncher method");

        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        launcher.setTaskExecutor(simpleAsyncTaskExecutor);
        return launcher;
    }
}

Upvotes: 0

Views: 7592

Answers (1)

Michael Minella
Michael Minella

Reputation: 21453

Your readers are not thread safe and not step scoped. Because of that, you're running into concurrency issues. Configure each of your stateful ItemReaders (the ones that implement ItemStream like the JdbcCursorItemReader), to be step scoped by adding the @StepScope annotation and things should work fine.

Upvotes: 1

Related Questions