sage
sage

Reputation: 657

Spring batch JdbcPagingItemReader not initializing properly because it is not closing properly

I have a requirement to scan some entities in the database and update an expiration ("isexpired") flag every n mumber of minutes. After this I would post the whole entity as json to a url specified as an attribute of this same entity.

This code runs the batch job and updates the database but it keeps showing the error below and when debugging in Springsource STS, it appears that the program keeps looping continuously for a number of times before updating the database:

INFO: Overriding bean definition for bean 'expireAndPostApiEntityJob': replacing [Generic bean: class [org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.batch.core.configuration.xml.JobParserJobFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
Jan 19, 2015 4:46:56 PM org.springframework.core.io.support.PropertiesLoaderSupport loadProperties
INFO: Loading properties file from class path resource [application.properties]
Jan 19, 2015 4:46:56 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'itemReader': replacing [Generic bean: class [org.springframework.batch.item.database.JdbcCursorItemReader]; scope=step; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [spring/batch/jobs/job-extract-users.xml]] with [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [spring/batch/jobs/job-extract-users.xml]]
Jan 19, 2015 4:46:56 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'pagingItemReader': replacing [Generic bean: class [org.springframework.batch.item.database.JdbcPagingItemReader]; scope=step; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [spring/batch/jobs/job-extract-users.xml]] with [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [spring/batch/jobs/job-extract-users.xml]]
Jan 19, 2015 4:46:56 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'apiItemProcessor': replacing [Generic bean: class [com.x.apimanagerbatchjob.ApiItemProcessor]; scope=step; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [spring/batch/jobs/job-extract-users.xml]] with [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [spring/batch/jobs/job-extract-users.xml]]
Jan 19, 2015 4:46:56 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7faf889f: defining beans [jobRepository,transactionManager,jobLauncher,databaseProperties,dataSource,itemReader,pagingItemReader,apiItemProcessor,oracleItemWriter,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,expireAndPostApiEntityStep,expireAndPostApiEntityJob,runScheduler,org.springframework.scheduling.support.ScheduledMethodRunnable#0,org.springframework.scheduling.config.IntervalTask#0,org.springframework.scheduling.config.ScheduledTaskRegistrar#0,apiDAO,scopedTarget.itemReader,scopedTarget.pagingItemReader,scopedTarget.apiItemProcessor]; root of factory hierarchy
Jan 19, 2015 4:46:56 PM org.springframework.batch.core.launch.support.SimpleJobLauncher afterPropertiesSet
INFO: No TaskExecutor has been set, defaulting to synchronous executor.
Jan 19, 2015 4:46:56 PM org.springframework.jdbc.datasource.DriverManagerDataSource setDriverClassName
INFO: Loaded JDBC driver: oracle.jdbc.driver.OracleDriver
Jan 19, 2015 4:46:56 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=expireAndPostApiEntityJob]] launched with the following parameters: [{isexpired=0}]
Jan 19, 2015 4:46:56 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [expireAndPostApiEntityStep]
Jan 19, 2015 4:46:56 PM org.springframework.batch.core.step.AbstractStep execute
SEVERE: Encountered an error executing the step
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:142)
    at org.springframework.batch.item.database.JdbcPagingItemReader.open(JdbcPagingItemReader.java:249)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:131)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    at com.sun.proxy.$Proxy4.open(Unknown Source)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:306)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:192)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:137)
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:152)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:131)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:301)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:134)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:127)
    at com.x.apimanagerbatchjob.scheduler.RunScheduler.run(RunScheduler.java:33)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:64)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first
    at org.springframework.util.Assert.state(Assert.java:385)
    at org.springframework.batch.item.database.AbstractPagingItemReader.doOpen(AbstractPagingItemReader.java:133)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:139)
    ... 40 more

I will inline some parts of my context configuration and spring code here. App.java - The entry point of the application.

public class App {

    public static void main(String[] args) {

        String[] springConfig = { "spring/batch/jobs/job-extract-users.xml" };

        @SuppressWarnings({ "resource", "unused" })
        ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
    }

}

RunScheduler.java - The class that initiates the job

Component
public class RunScheduler {


  public void run() {

      String[] springConfig = { "spring/batch/jobs/job-extract-users.xml" };

        @SuppressWarnings({ "resource" })
        ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);

        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("expireAndPostApiEntityJob");

        try {

            JobParameters param = new JobParametersBuilder().addString("isexpired", "0").toJobParameters();

            JobExecution execution = jobLauncher.run(job, param);
            System.out.println("Exit Status : " + execution.getStatus());
            System.out.println("Exit Status : " + execution.getAllFailureExceptions());

        } catch (Exception e) {
            e.printStackTrace();

        }

        System.out.println("Done");

  }
}

The context xml that configures the beans for the JdbcPagingItemReader

<import resource="classpath:spring/**/context.xml" />
    <import resource="classpath:spring/**/database.xml" />

    <bean id="itemReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader"
        scope="step">
        <property name="dataSource" ref="dataSource" />
        <property name="sql"
            value="select id, apikey, apitoken, url, isexpired, createddate, modifieddate, posterror from apis where isexpired = #{jobParameters['isexpired']}" />
        <property name="rowMapper">
            <bean class="com.x.apimanagerbatchjob.ApiRowMapper" />
        </property>
    </bean>

    <bean id="pagingItemReader"
        class="org.springframework.batch.item.database.JdbcPagingItemReader"
        scope="step">
        <property name="dataSource" ref="dataSource" />
        <property name="queryProvider">
            <bean
                class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
                <property name="dataSource" ref="dataSource" />
                <property name="selectClause" value="select id, apikey, apitoken, url, isexpired, createddate, modifieddate, posterror" />
                <property name="fromClause" value="from apis" />
                <property name="whereClause" value="where isexpired=:isexpired" />
                <property name="sortKey" value="id" />
            </bean>
        </property>
        <property name="parameterValues">
            <map>
                <entry key="isexpired" value="#{jobParameters['isexpired']}" />
            </map>
        </property>
        <!-- use property place holder configure -->
        <property name="pageSize" value="${pagingItemReader.pageSize}" />
        <property name="rowMapper">
            <bean class="com.x.apimanagerbatchjob.ApiRowMapper" />
        </property>
    </bean>

    <bean id="apiItemProcessor" class="com.x.apimanagerbatchjob.ApiItemProcessor" scope="step"/>

    <bean id="oracleItemWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource" />
        <property name="sql">
          <value>
                <![CDATA[        
                    update apis
                    set isexpired = 1,
                    modifieddate = CURRENT_DATE
                    where id = :id 
                ]]>
          </value>
        </property>
        <!-- It will take care matching between object property and sql name parameter -->
        <property name="itemSqlParameterSourceProvider">
            <bean
            class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
        </property>
      </bean>

    <job id="expireAndPostApiEntityJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="expireAndPostApiEntityStep">
            <tasklet>
                <chunk reader="pagingItemReader" processor="apiItemProcessor" writer="oracleItemWriter"
                    commit-interval="1" />
            </tasklet>
        </step>
    </job>

    <bean id="runScheduler" class="com.x.apimanagerbatchjob.scheduler.RunScheduler" />

  <!-- Run every 900 seconds (15 mins) -->
  <task:scheduled-tasks>

    <task:scheduled ref="runScheduler" method="run" fixed-delay="${scheduler.interval}" /> 

    <!-- 
    <task:scheduled ref="runScheduler" method="run" cron="*/900 * * * * *" />
    -->
  </task:scheduled-tasks>

  <bean id="apiDAO" class="com.x.apimanagerbatchjob.ApiDAOJDBCTemplateImpl">
        <property name="dataSource" ref="dataSource" />
    </bean>

I will appreciate help with this. Thank you very much.

Upvotes: 4

Views: 5104

Answers (2)

Swarit Agarwal
Swarit Agarwal

Reputation: 2648

Try to bind your reader bean with @StepScope. It resolved issue for me

Upvotes: 5

Michael Minella
Michael Minella

Reputation: 21453

I think the issue is due to the fact that you have a bit of recursion going on in your app. Your App class is bootstrapping the same context as the RunScheduler class. I'd recommend having two contexts. One that is bootstrapped by the App class that handles just starting the scheduler and then the other context that is bootstrapped by the RunScheduler class to bootstrap the job. You can even move common components to the one the App class bootstraps and set that as the parent context when the RunScheduler bootstraps it's context.

Upvotes: 0

Related Questions