user13552573
user13552573

Reputation:

Spring batch. How to chain multiple itemProcessors with different types?

i have to compose 2 processors as following :

the CompositeItemProcessor<I, O> requires the delegates to be in the same type , moreover when passing it to the Step the step is already configure with fixed types <A,B>.

how i could chain these processors with different types and assign it to the step processor ?

Upvotes: 10

Views: 7504

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31620

You need to declare your step as well as your composite processor with <A, B>. Here is a quick example:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public ItemReader<A> itemReader() {
        return new ListItemReader<>(Arrays.asList(new A("a1"), new A("a2")));
    }

    @Bean
    public ItemProcessor<A, B> itemProcessor1() {
        return item -> new B(item.name);
    }

    @Bean
    public ItemProcessor<B, B> itemProcessor2() {
        return item -> item; // TODO process item as needed
    }

    @Bean
    public ItemProcessor<A, B> compositeItemProcessor() {
        CompositeItemProcessor<A, B> compositeItemProcessor = new CompositeItemProcessor<>();
        compositeItemProcessor.setDelegates(Arrays.asList(itemProcessor1(), itemProcessor2()));
        return compositeItemProcessor;
    }

    @Bean
    public ItemWriter<B> itemWriter() {
        return items -> {
            for (B item : items) {
                System.out.println("item = " + item.name);
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<A, B>chunk(2)
                        .reader(itemReader())
                        .processor(compositeItemProcessor())
                        .writer(itemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }
    
    class A {
        String name;
        public A(String name) { this.name = name; }
    }

    class B {
        String name;
        public B(String name) { this.name = name; }
    }

}

Upvotes: 12

Related Questions