Reputation: 938
I'm using spring batch with custom reader and writer.
I have a control table with customerId's.
I need to run the same Step multiple times, once for each customer in my control table.
The customerId
should be able to pass as a parameter since I need it in the reader as well as in the writer.
How can this best be achieved?
@Bean
public Step shipmentFactsStep() {
return stepBuilderFactory.get("shipmentFactsStep")
.<Shipmentfacts, Shipmentfacts>chunk(10000)
.reader(shipmentfactsItemReader())
.processor(shipmentFactProcessor())
.writer(shipmentFactsWriter())
.build();
}
Upvotes: 1
Views: 2660
Reputation: 10142
One way of achieving this is by Partitioning. This approach seems better if you wish to keep track about which customersIds
have been completed since there would be a slave step for each customer id.
1.First create your partitioner class by implementing org.springframework.batch.core.partition.support.Partitioner
interface and populate Map<String, ExecutionContext>
for each customer id.
Since, you are partitioning by customer id, method parameter gridSize
will not be used for your case.
Code will look something like this, Where allCustomers
is the list that you prepared from database.
class - CustomerPartitioner
Map<String, ExecutionContext> result = new HashMap<>();
int partitionNumber = 0;
for (String customer: allCustomers) {
ExecutionContext value = new ExecutionContext();
value.putString("customerId", customer);
result.put("Customer Id [" + customer+ "] : THREAD "
+ partitionNumber, value);
partitionNumber++;
}
2.Modify your step definition in terms of master step and slave step. Refer online tutorials for this.
Sample code will be similar to this.
@Bean
public Step customerPartitionerStep() throws Exception {
return step.get("customerPartitionerStep")
.partitioner(shipmentFactsStep())
.partitioner("shipmentFactsStep", customerPartitioner())
.gridSize(partitionerGridSize).taskExecutor(taskExecutor())
.build();
}
@Bean
public Step shipmentFactsStep() {
return stepBuilderFactory.get("shipmentFactsStep")
.<Shipmentfacts, Shipmentfacts>chunk(10000)
.reader(shipmentfactsItemReader())
.processor(shipmentFactProcessor())
.writer(shipmentFactsWriter())
.build();
}
@Bean
public Partitioner customerPartitioner() {
return new CustomerPartitioner();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor simpleTaskExecutor = new SimpleAsyncTaskExecutor();
simpleTaskExecutor.setConcurrencyLimit(concurrencyLimit);
return simpleTaskExecutor;
}
You can set partitionerGridSize
to any value since its not being used in your partitioner implementation. You can use it later to do partitioning on the basis of number total records instead of only customer id.
3.In code at step #2 above, its very important to set concurrencyLimit=1
..that way only one customer will run at a time and it will run for all customers that you entered in map at step #1. You can run as many customers as you want in parallel by setting this value.
4.customerId
from partitioner at step at step # 1 can be accessed in reader, processor etc by doing
@Bean
@StepScope
public ItemReader<ReadBean> shipmentfactsItemReader(
@Value("#{stepExecutionContext[customerId]}" String customerId){
..
}
Note the annotation, @StepScope
..that is mandatory for this binding of values. Also, in your reader definition, you need to pass null
like this - .reader(shipmentfactsItemReader(null))
In your Spring Batch meta data, you will have as many steps as number of customers plus one master step. Master step will end when all slave steps have finished.
Advantage here is that you can process many customers in parallel if need be and each slave step for a customer will run in its own separate thread.
Hope it helps !!
Upvotes: 2