Amardeep
Amardeep

Reputation: 378

Spring Batch Multiple Threads

I am writing a Spring Batch with idea of scaling it when required. My ApplicationContext looks like this

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@ComponentScan(basePackages = "in.springbatch")
@PropertySource(value = {"classpath:springbatch.properties"})

public class ApplicationConfig {

@Autowired
Environment environment;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job job() throws Exception {

    return jobs.get("spring_batch")
            .flow(step()).end()
            .build();
}

@Bean(name = "dataSource", destroyMethod = "close")
public DataSource dataSource() {

    BasicDataSource basicDataSource = new BasicDataSource();



    return basicDataSource;
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setTransactionManager(transactionManager());
    jobRepositoryFactoryBean.setDataSource(dataSource());
    return jobRepositoryFactoryBean.getObject();
}

 @Bean(name = "batchstep")
 public Step step() throws Exception {

    return    stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
    transactionManager(transactionManager()).
          chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build();

  }



@Bean
ItemReader batchReader() throws Exception {
    System.out.println(Thread.currentThread().getName()+"reader");
    HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>();
    hibernateCursorItemReader.setQueryString("from Source");
    hibernateCursorItemReader.setFetchSize(2);
    hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject());


    hibernateCursorItemReader.close();
    return hibernateCursorItemReader;
}

@Bean
 public ItemProcessor processor() {
     return new BatchProcessor();
 }

@Bean
public ItemWriter writer() {
    return new BatchWriter();
}

public TaskExecutor taskExecutor(){

    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
    asyncTaskExecutor.setConcurrencyLimit(5);
    return asyncTaskExecutor;


}
@Bean
public LocalSessionFactoryBean sessionFactory() {
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
    sessionFactory.setDataSource(dataSource());
    sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"});
    sessionFactory.setHibernateProperties(hibernateProperties());

    return sessionFactory;
}

@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() {
    return new PersistenceExceptionTranslationPostProcessor();
}

@Bean
@Autowired
public HibernateTransactionManager transactionManager() {
    HibernateTransactionManager txManager = new HibernateTransactionManager();
    txManager.setSessionFactory(sessionFactory().getObject());

    return txManager;
}

Properties hibernateProperties() {
    return new Properties() {
        {
            setProperty("hibernate.hbm2ddl.auto",       environment.getProperty("hibernate.hbm2ddl.auto"));
            setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect"));
            setProperty("hibernate.globally_quoted_identifiers", "false");

        }
    };
}

}

  1. With above configuration I am able to read from DB , process the data and write to DB.
  2. I am using chunk size as 2 and reading 2 records from cursor using HibernateCusrsorItem reader and my query to read from DB is based on date to pick current date records.
  3. So far I am able to achieve desired behavior as well as restart ability with job only picking records which were not processed due to failure in previous run.

Now my requirement is to make batch use multiple threads to process data and write to DB.

My Processor and writer looks like this

@Component
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{

@Override
public DestinationDto process(Source source) throws Exception {

        System.out.println(Thread.currentThread().getName()+":"+source);
        DestinationDto destination=new DestinationDto();
        destination.setName(source.getName());
        destination.setValue(source.getValue());
        destination.setSourceId(source.getSourceId().toString());

    return destination;
}
@Component
public class BatchWriter implements ItemWriter<DestinationDto>{

@Autowired
IBatchDao batchDao;

@Override
public void write(List<? extends DestinationDto> list) throws Exception {
   System.out.println(Thread.currentThread().getName()+":"+list);
    batchDao.saveToDestination((List<DestinationDto>)list);
}

I updated my step and added a ThreadPoolTaskExecutor as follows

@Bean(name = "batchstep")
public Step step() throws Exception {

    return  stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
     transactionManager(transactionManager()).chunk(1).reader(batchReader()).
     processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build();

  }

After this my processor is getting called by multiple threads but with same source data. Is there anything extra i need to do?

Upvotes: 2

Views: 14932

Answers (1)

Saifuddin Merchant
Saifuddin Merchant

Reputation: 1171

This is a big question

  1. Your best bet at getting a good answers would be to look through the Scaling and Parallel Processing chapter in the Spring Batch Documentation (Here)

  2. There might be some multi-threading samples in the spring batch examples (Here)

  3. An easy way to thread the Spring batch job is to Create A Future Processor - you put all your Processing Logic in a Future Object and you spring-processor class only adds Objects to the future. You writer class then wait on the future to finish before performing the write process. Sorry I don't have a sample to point you too for this - but if you have specific questions I can try and answer!

Upvotes: 1

Related Questions