skpraveen
skpraveen

Reputation: 303

How to implement reader for multiple queries but same output item?

For a Spring batch job, we have 2 different queries on the same table. The requirement is to have a reader that execute two queries to read data from the same table.

One way could be :

<batch:step id="firstStep" next="secondStep">
       <batch:tasklet>
          <batch:chunk reader="firstReader" writer="firstWriter" commit-        interval="2">
          </batch:chunk>
       </batch:tasklet>
    </batch:step>
    <batch:step id="secondStep" next="thirdStep">
       <batch:tasklet>
          <batch:chunk reader="secondReader" writer="secondWriter"
           commit-interval="2">
          </batch:chunk>
       </batch:tasklet>
    </batch:step>

But this demands totally another step to be defined which is a copy of the first. Is there any other way to achieve the same ? I am looking for something like MultiResourceItemReader for DB based readers that aggregates the data together.

Upvotes: 4

Views: 20326

Answers (2)

surya
surya

Reputation: 2749

You can create one view in database for different queries and call it as you call in a JdbcPagingItemReader .If thats not an option then there are different ways , but one way i have worked is as given below.Spring has other option as well, but as per developer stand point following is definitely an option.

Create two item reader ...first one is below

<!--use org.springframework.batch.item.database.JdbcCursorItemReader for  simple queries-->
<bean id="itemReader1"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
 <property name="sql"
    value=" FROM   table1" />
    .......
    <property name="rowMapper">
        <bean class="com.sjena.AccountApplicationMapper" />
    </property>
</bean>

then another reader from table 2

<bean id="itemReader2"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
<property name="sql"
    value="FROM   table2" />
    .......
    <property name="rowMapper">
        <bean class="com.sjena.AccountApplicationMapper" />
    </property>
</bean>

then delegate to your custom reader

<bean id="customItemReader" class="com.sjena.spring.reader.MyCustomReader"
    scope="step">
    <property name="itemReader1" ref="itemReader1" />
    <property name="itemReader2" ref="itemReader2" />
    <property name="pageSize" value="5" />

</bean>

And eventually use this custom reader

<job id="testJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter"
                commit-interval="1" />
        </tasklet>
    </step>
</job>

Then your class is as given below

public class MyCustomReader implements ItemReader<AccountApplicationSummary> {

int pagesize;// you may have diff pagesize for diff item readers
ItemReader<AccountApplication>  itemReader1;
ItemReader<AccountApplication>  itemReader2;


@Override
public AccountApplicationSummary read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

    // itemReader1.setPageSize(pageSize),Be sure, itemReader is   JdbcPagingItemReader type and better to do these initiatlization in a init method (implement InitializingBean and use afterpropertyset to set them..).. 
    //Like pageSize, you can set anyproperty that you may need

    AccountApplication application1 = itemReader1.read();
    AccountApplication application2 = itemReader2.read();
    //And you have results from both tables and now you can play with it 

    AccountApplicationSummary summary = new AccountApplicationSummary();

    return summary;
}

}

Upvotes: 3

Sander_M
Sander_M

Reputation: 1119

This answer is an adaption of the answer provided by Hansjoerg on my similar question with regard to executing a step multiple times: Spring Batch - Looping a reader/processor/writer step

package hello;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    List<String> queries = Arrays.asList("some query1, "some query2");

    @Bean
    public Job multiQueryJob() {

        List<Step> steps = queries.stream().map(query -> createStep(query)).collect(Collectors.toList());       

        return jobBuilderFactory.get("multiQueryJob")
                .start(createParallelFlow(steps))
                .end()
                .build();
    }

    private Step createStep(String query) {

        return stepBuilderFactory.get("convertStepFor" + query)
                .chunk(10)
                .reader(createQueryReader(query))
                .writer(dummyWriter())
                .build();
    }

    private Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(1); // force sequential execution

        List<Flow> flows = steps.stream()

                .map(step -> new FlowBuilder<Flow>("flow_" + step.getName())
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
                .split(taskExecutor)
                .add(flows.toArray(new Flow[flows.size()])).build();
    }

    public JdbcCursorItemReader<Actor> createQueryReader(String query) {
        JdbcCursorItemReader<Actor> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource());
        reader.setSql(query);
        reader.setRowMapper(mapper());
        return reader;
    }

    public BeanPropertyRowMapper<Actor> mapper(){
        BeanPropertyRowMapper<Actor> mapper = new BeanPropertyRowMapper<>();
        mapper.setMappedClass(Actor.class);
        return mapper;
    }

    public DummyItemWriter dummyWriter() {
        return new DummyItemWriter();
    }

    public DataSource dataSource() {
        final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
        try {
            dataSource.setDriver(new com.mysql.jdbc.Driver());
        } catch (SQLException e) {
            e.printStackTrace();
        }
        dataSource.setUrl("jdbc:mysql://localhost:3306/SAKILA");
        dataSource.setUsername("sa");
        dataSource.setPassword("password");
        return dataSource;
    }

}

I provided two dummy queries in the list of queries, you have to provide the actual queries. The job will be constructed based on the amount of queries and in this example I used Spring Batch JdbcCursorItemReader to read data from a database.

You can recreate this configuration by using the example provided by Spring https://spring.io/guides/gs/batch-processing/ and adding a Actor POJO and last but not least, remove the classes you don't need (you will only need the BatchConfiguration and the Application classes).

Upvotes: 0

Related Questions