Reputation: 73
I am using spring batch spring cloud task to automate data read/write (read from file and store in MongoDb). In my use case i have 2 (will add 1 more step after successfully achieve 2) steps. I am trying to use remote partitioning integrate spring cloud task DeployerPartitionHandler.java class master node to slave node as a broker, that is what i understand instead of using activemg/rabbitmq spring integration. I have created 2 partitioner and 2 partitioner handlers bean for my 2 steps. Below is sample code. I am getting below exception.
2020-03-11 12:03:59 - o.s.batch.core.step.AbstractStep - Encountered an error executing step step1 in job Job669228617
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313)
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302)
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy77.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192)
@Configuration
@ComponentScan(basePackageClasses = DbConfig.class)
public JobConfig {
@Autowired
private TaskLauncher taskLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private TaskRepository taskRepository;
@Autowired
private Reader1 reader2;
@Autowired
private Writer2 writer2;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DelegatingResourceLoader resourceLoader;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
public JobRepository jobRepository;
@Autowired
private Environment environment;
private static final int GRID_SIZE = 2;
@Autowired
private Reader1 reader1;
@Autowired
private Writer2 writer2;
@Autowired
@Qualifier("partitionHandler1")
private PartitionHandler partitionHandler1;
@Autowired
@Qualifier("partitionHandler2")
private PartitionHandler partitionHandler2;
@Bean
@Profile("master")
public Job masterJob() {
Random random = new Random();
return this.jobBuilderFactory.get("masterJob" + random.nextInt())
.start(step1())
.next(step2())
.build();
}
@Bean
@Profile("master")
public Step step1() {
return this.stepBuilderFactory.get("step1")
.partitioner("slaveStep1", partitioner1())
.partitionHandler(partitionHandler1)
.taskExecutor(taskExecutor())
.build();
}
@Bean
@Profile("master")
public Step step2() {
return this.stepBuilderFactory.get("step2")
.partitioner("slaveStep2",partitioner2())
.partitionHandler(partitionHandler2)
.taskExecutor(taskExecutor())
.build();
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}
@Bean
public Step slaveStep1() {
return this.stepBuilderFactory.get("slaveStep1")
.<Domain1, Domain1>chunk(50)
.reader(reader1)
.writer(writer1)
.listener(stepExecutionListner())
.build();
}
@Bean
public Step slaveStep2() {
return this.stepBuilderFactory.get("slaveStep2")
.<Domain2, Domain2>chunk(50)
.reader(reader2)
.writer(writer2)
.listener(stepExecutionListner())
.build();
}
@Bean
public Partitioner partitioner1() {
FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test1*.csv");
return filePartitioner.getFilesPartitioner();
}
@Bean
public Partitioner partitioner2() {
FilePartitioner filePartitioner = new FilePartitioner("classpath:input/test2*.csv");
return filePartitioner.getFilesPartitioner();
}
@Bean(name="partitionHandler1")
public PartitionHandler partitionHandler1(TaskLauncher taskLauncher,
JobExplorer jobExplorer, TaskRepository taskRepository) {
Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "ormBusUnitLoaderStep",taskRepository);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
partitionHandler.setMaxWorkers(3);
partitionHandler.setApplicationName("Job");
return partitionHandler;
}
@Bean(name="partitionHandler2")
//@Scope(value = "prototype")
public PartitionHandler partitionHandler2(TaskLauncher taskLauncher,
JobExplorer jobExplorer, TaskRepository taskRepository) {
Resource resource = this.resourceLoader.getResource("maven://com.abc:test:1.0-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "cvaRmaStep",taskRepository);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
SimpleEnvironmentVariablesProvider environmentVariablesProvider = new SimpleEnvironmentVariablesProvider(this.environment);
partitionHandler.setEnvironmentVariablesProvider(environmentVariablesProvider);
partitionHandler.setMaxWorkers(3);
partitionHandler.setApplicationName("CVAJob");
return partitionHandler;
}
@Bean
@StepScope
public StepExecutionListner stepExecutionListner() {
return new StepExecutionListner();
}
}
Below is DB config
@Configuration
public class DbConfig implements BatchConfigurer {
@ConfigurationProperties(prefix = "spring.datasource")
@Bean(name="batchDataSource")
@Primary
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Override
public JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("ORACLE");
factoryBean.setDataSource(dataSource());
factoryBean.setTransactionManager(getTransactionManager());
factoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
factoryBean.setTablePrefix("SCHEMA.BATCH_");
return factoryBean.getObject();
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return new DataSourceTransactionManager(dataSource());
}
@Override
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factory = new JobExplorerFactoryBean();
factory.setDataSource(dataSource());
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
public TaskConfigurer taskConfigurer(
@Qualifier("batchDataSource")DataSource batchDataSource){
return new DefaultTaskConfigurer(batchDataSource);
}
}
How to accomplish my use case using remote partitioning?
Upvotes: 1
Views: 540
Reputation: 11
I think that you can just have one single partitionHandler bean, but what you can do is to have a partitioned subflow with multiple steps, instead of having multiple steps with different partition handlers!
Upvotes: 1