Eddy Bayonne
Eddy Bayonne

Reputation: 2668

Passing Data to Future Steps - Spring Batch

As we can see in Spring batch official documentation "Passing Data to Future Steps - Spring Batch" is possible but most of us have been struggling with it because in the official documentation they have mentioned the two possibilities. Step level and one at the Job level. The problem is how to retrieve the data at the step level?

My solution was the same as they related in official documentation but it didn't work. So I decided to do the following:

1- Create bean for promotion listener:

<beans:bean id="promotionListener"
                class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
        <beans:property name="keys" value="someValues"/>
</beans:bean>

2- Set the listener in your step (The step you wish to save the data)

        <listeners>
            <listener ref="promotionListener"/>
        </listeners>

3- In the same step (the step in which the data will be saved) in your writer, you save the data.

private StepExecution stepExecution;

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;

        ExecutionContext executionContext = stepExecution.getExecutionContext();
        Map<String, String> fieldsMap= new HashMap<>();
        executionContext.put("keys", someValues);
    }

@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
    LOGGER.info(items.toString());

    Map<String, String> fieldsMap= new ConcurrentHashMap<>();
    items.forEach(item -> item.forEach(fieldsMap::put));

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("keys", fieldsMap);
}

You can see, in my case I am saving the data in a Map (ConcurrentHashMap).

4- IMPORTANT: In the next step, you want to retrieve the data that we saved in the previous step. In that order, we must do:

Pay attention to JobExecution

enter image description here

@BeforeStep
    public void retrieveInterStepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
        for (StepExecution steps : stepExecutions) {
            ExecutionContext executionContext = steps.getExecutionContext();
            if (executionContext.containsKey("keys")) {
                this.nationalityMap = (Map<String, String>) executionContext.get("keys");
            }
        }
    }

That's it! You may wonder why I didn't follow the way it is written in official documentation? The reason is that I am working with Steps in the same job. They share the same job execution. Now take a look at the picture of my debugging mode.

Suggestion please if there is another way.

NOTE: Please don't just copy and paste code form official documentation, provide your own answer or implementation.

The link of spring batch documentation which relates about this issue is below enter link description here

Upvotes: 7

Views: 13506

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

You are confusing the keys to be promoted from the step execution context to the job execution context with data itself. This confusion comes from two places:

  • <beans:property name="keys" value="someValues"/>: someValues should be someKeys
  • calling executionContext.put("keys", someValues); in the @BeforeStep is incorrect

Let me make this a bit clearer. Imagine you have a job with two steps:

  • step 1 : reads/writes some items and write the item count in the step execution context
  • step 2 : needs to access the item count and print it to the console.

In this case, you can use the promotion listener to promote the key "count" from the step execution context of step 1 to the job execution context so that step 2 can access it. Here is an example:

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return new ItemWriter<Integer>() {

            private StepExecution stepExecution;

            @Override
            public void write(List<? extends Integer> items) {
                for (Integer item : items) {
                    System.out.println("item = " + item);
                }
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
                stepContext.put("count", count + items.size());
            }

            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        };
    }

    @Bean
    public Step step1() {
        return steps.get("step1")
                .<Integer, Integer>chunk(2)
                .reader(itemReader())
                .writer(itemWriter())
                .listener(promotionListener())
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    // retrieve the key from the job execution context
                    Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
                    System.out.println("In step 2: step 1 wrote " + count + " items");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"count"});
        return listener;
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step1())
                .next(step2())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

This prints:

item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items

Hope this helps.

Upvotes: 6

Related Questions