Reputation: 303
For a Spring batch job, we have 2 different queries on the same table. The requirement is to have a reader that execute two queries to read data from the same table.
One way could be :
<batch:step id="firstStep" next="secondStep">
<batch:tasklet>
<batch:chunk reader="firstReader" writer="firstWriter" commit- interval="2">
</batch:chunk>
</batch:tasklet>
</batch:step>
<batch:step id="secondStep" next="thirdStep">
<batch:tasklet>
<batch:chunk reader="secondReader" writer="secondWriter"
commit-interval="2">
</batch:chunk>
</batch:tasklet>
</batch:step>
But this demands totally another step to be defined which is a copy of the first. Is there any other way to achieve the same ? I am looking for something like MultiResourceItemReader for DB based readers that aggregates the data together.
Upvotes: 4
Views: 20326
Reputation: 2749
You can create one view in database for different queries and call it as you call in a JdbcPagingItemReader .If thats not an option then there are different ways , but one way i have worked is as given below.Spring has other option as well, but as per developer stand point following is definitely an option.
Create two item reader ...first one is below
<!--use org.springframework.batch.item.database.JdbcCursorItemReader for simple queries-->
<bean id="itemReader1"
class="org.springframework.batch.item.database.JdbcPagingItemReader"
<property name="sql"
value=" FROM table1" />
.......
<property name="rowMapper">
<bean class="com.sjena.AccountApplicationMapper" />
</property>
</bean>
then another reader from table 2
<bean id="itemReader2"
class="org.springframework.batch.item.database.JdbcCursorItemReader"
<property name="sql"
value="FROM table2" />
.......
<property name="rowMapper">
<bean class="com.sjena.AccountApplicationMapper" />
</property>
</bean>
then delegate to your custom reader
<bean id="customItemReader" class="com.sjena.spring.reader.MyCustomReader"
scope="step">
<property name="itemReader1" ref="itemReader1" />
<property name="itemReader2" ref="itemReader2" />
<property name="pageSize" value="5" />
</bean>
And eventually use this custom reader
<job id="testJob" xmlns="http://www.springframework.org/schema/batch">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="1" />
</tasklet>
</step>
</job>
Then your class is as given below
public class MyCustomReader implements ItemReader<AccountApplicationSummary> {
int pagesize;// you may have diff pagesize for diff item readers
ItemReader<AccountApplication> itemReader1;
ItemReader<AccountApplication> itemReader2;
@Override
public AccountApplicationSummary read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// itemReader1.setPageSize(pageSize),Be sure, itemReader is JdbcPagingItemReader type and better to do these initiatlization in a init method (implement InitializingBean and use afterpropertyset to set them..)..
//Like pageSize, you can set anyproperty that you may need
AccountApplication application1 = itemReader1.read();
AccountApplication application2 = itemReader2.read();
//And you have results from both tables and now you can play with it
AccountApplicationSummary summary = new AccountApplicationSummary();
return summary;
}
}
Upvotes: 3
Reputation: 1119
This answer is an adaption of the answer provided by Hansjoerg on my similar question with regard to executing a step multiple times: Spring Batch - Looping a reader/processor/writer step
package hello;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
List<String> queries = Arrays.asList("some query1, "some query2");
@Bean
public Job multiQueryJob() {
List<Step> steps = queries.stream().map(query -> createStep(query)).collect(Collectors.toList());
return jobBuilderFactory.get("multiQueryJob")
.start(createParallelFlow(steps))
.end()
.build();
}
private Step createStep(String query) {
return stepBuilderFactory.get("convertStepFor" + query)
.chunk(10)
.reader(createQueryReader(query))
.writer(dummyWriter())
.build();
}
private Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1); // force sequential execution
List<Flow> flows = steps.stream()
.map(step -> new FlowBuilder<Flow>("flow_" + step.getName())
.start(step)
.build())
.collect(Collectors.toList());
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows.toArray(new Flow[flows.size()])).build();
}
public JdbcCursorItemReader<Actor> createQueryReader(String query) {
JdbcCursorItemReader<Actor> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource());
reader.setSql(query);
reader.setRowMapper(mapper());
return reader;
}
public BeanPropertyRowMapper<Actor> mapper(){
BeanPropertyRowMapper<Actor> mapper = new BeanPropertyRowMapper<>();
mapper.setMappedClass(Actor.class);
return mapper;
}
public DummyItemWriter dummyWriter() {
return new DummyItemWriter();
}
public DataSource dataSource() {
final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
try {
dataSource.setDriver(new com.mysql.jdbc.Driver());
} catch (SQLException e) {
e.printStackTrace();
}
dataSource.setUrl("jdbc:mysql://localhost:3306/SAKILA");
dataSource.setUsername("sa");
dataSource.setPassword("password");
return dataSource;
}
}
I provided two dummy queries in the list of queries, you have to provide the actual queries. The job will be constructed based on the amount of queries and in this example I used Spring Batch JdbcCursorItemReader to read data from a database.
You can recreate this configuration by using the example provided by Spring https://spring.io/guides/gs/batch-processing/ and adding a Actor POJO and last but not least, remove the classes you don't need (you will only need the BatchConfiguration and the Application classes).
Upvotes: 0