Reputation: 2668
As we can see in Spring batch official documentation "Passing Data to Future Steps - Spring Batch" is possible but most of us have been struggling with it because in the official documentation they have mentioned the two possibilities. Step level and one at the Job level. The problem is how to retrieve the data at the step level?
My solution was the same as they related in official documentation but it didn't work. So I decided to do the following:
1- Create bean for promotion listener:
<beans:bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="someValues"/>
</beans:bean>
2- Set the listener in your step (The step you wish to save the data)
<listeners>
<listener ref="promotionListener"/>
</listeners>
3- In the same step (the step in which the data will be saved) in your writer, you save the data.
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ExecutionContext executionContext = stepExecution.getExecutionContext();
Map<String, String> fieldsMap= new HashMap<>();
executionContext.put("keys", someValues);
}
@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
LOGGER.info(items.toString());
Map<String, String> fieldsMap= new ConcurrentHashMap<>();
items.forEach(item -> item.forEach(fieldsMap::put));
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("keys", fieldsMap);
}
You can see, in my case I am saving the data in a Map (ConcurrentHashMap).
4- IMPORTANT: In the next step, you want to retrieve the data that we saved in the previous step. In that order, we must do:
Declare the object which will hold the value that we will retrieve as:
private Map fieldsMap;
Pay attention to JobExecution
@BeforeStep
public void retrieveInterStepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
for (StepExecution steps : stepExecutions) {
ExecutionContext executionContext = steps.getExecutionContext();
if (executionContext.containsKey("keys")) {
this.nationalityMap = (Map<String, String>) executionContext.get("keys");
}
}
}
That's it! You may wonder why I didn't follow the way it is written in official documentation? The reason is that I am working with Steps in the same job. They share the same job execution. Now take a look at the picture of my debugging mode.
Suggestion please if there is another way.
NOTE: Please don't just copy and paste code form official documentation, provide your own answer or implementation.
The link of spring batch documentation which relates about this issue is below enter link description here
Upvotes: 7
Views: 13506
Reputation: 31600
You are confusing the keys to be promoted from the step execution context to the job execution context with data itself. This confusion comes from two places:
<beans:property name="keys" value="someValues"/>
: someValues
should be someKeys
executionContext.put("keys", someValues);
in the @BeforeStep
is incorrectLet me make this a bit clearer. Imagine you have a job with two steps:
In this case, you can use the promotion listener to promote the key "count" from the step execution context of step 1 to the job execution context so that step 2 can access it. Here is an example:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
for (Integer item : items) {
System.out.println("item = " + item);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
stepContext.put("count", count + items.size());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
// retrieve the key from the job execution context
Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
System.out.println("In step 2: step 1 wrote " + count + " items");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"count"});
return listener;
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
This prints:
item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items
Hope this helps.
Upvotes: 6