Reputation:
i have to compose 2 processors as following :
processor 1
implement the itemProcessor
Interface with itemProcessor<A,B>
(transforming data).processor 2
implement the itemProcessor
Interface with itemProcessor<B,B>
.(treat transformed data).the CompositeItemProcessor<I, O>
requires the delegates to be in the same type , moreover when passing it to the Step
the step is already configure with fixed types <A,B>
.
how i could chain these processors with different types and assign it to the step
processor ?
Upvotes: 10
Views: 7504
Reputation: 31620
You need to declare your step as well as your composite processor with <A, B>
. Here is a quick example:
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemReader<A> itemReader() {
return new ListItemReader<>(Arrays.asList(new A("a1"), new A("a2")));
}
@Bean
public ItemProcessor<A, B> itemProcessor1() {
return item -> new B(item.name);
}
@Bean
public ItemProcessor<B, B> itemProcessor2() {
return item -> item; // TODO process item as needed
}
@Bean
public ItemProcessor<A, B> compositeItemProcessor() {
CompositeItemProcessor<A, B> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(itemProcessor1(), itemProcessor2()));
return compositeItemProcessor;
}
@Bean
public ItemWriter<B> itemWriter() {
return items -> {
for (B item : items) {
System.out.println("item = " + item.name);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<A, B>chunk(2)
.reader(itemReader())
.processor(compositeItemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class A {
String name;
public A(String name) { this.name = name; }
}
class B {
String name;
public B(String name) { this.name = name; }
}
}
Upvotes: 12