Reputation: 15824
I am getting illegal state exception. I tried to resolve it by setting following parameters
etlReader.setSaveState(false);
etlReader.setUseSharedExtendedConnection(false);
Do anyone has clue what is going on?
My configuration in application.properties as follows:
spring.datasource.url=jdbc:postgresql://localhost:5432/
spring.datasource.username=user
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
spring.jpa.show-sql=true spring.batch.initialize-schema=always
@Scheduled param
etl.scheduler.frequency=3600000
This is the important part of ItemReader implementation. Please let me know if any additional information is required.
@SuppressWarnings("rawtypes")
@Bean
public StoredProcedureItemReader generateEvents(DataSource dataSource) {
StoredProcedureItemReader<Event> etlReader = new StoredProcedureItemReader<Event>();
etlReader.setDataSource(dataSource);
etlReader.setProcedureName("myStoredProcedure");
etlReader.setFunction(false);
etlReader.setRowMapper(etlNoOpRowMapper());
return etlReader;
}
@Bean
public NoOpEventETLWriter etljobWriter() {
return new NoOpMyDataETLWriter();
}
@Bean(name = "executeETLStep")
public Step executeETLStep(ItemReader<Event> generateEvents) {
return stepBuilderFactory.get("executeETLStep").<MyData, MyData>chunk(1).reader(generateEvents)
.writer(etljobWriter()).build();
}
@Bean(name = "etlJob")
public Job etlJob(CardinalBatchCompletionListener listener, @Qualifier("executeETLStep") Step executeETLStep) {
return jobBuilderFactory.get("etlJob").incrementer(new RunIdIncrementer()).listener(listener)
.flow(executeETLStep).end().build();
}
Stacktrace:
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at com.sun.proxy.$Proxy73.run(Unknown Source) ~[na:na] at com.domain.name.MyJobScheduler.runBatch(JobScheduler.java:40) ~[classes!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na] at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.IllegalStateException: Stream is already initialized. Close before re-opening.
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:424) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:149) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
... 37 common frames omitted
EDIT
All my beans are in configuration file, so they are initialized as soon as I start. And running the job using a scheduler. Why is job execution again trying to reinitialize the resources?
Upvotes: 0
Views: 513
Reputation: 31710
If your scheduler runs two job instances simultaneously, then the reader will be shared, which is the cause of your issue: One job opens the reader, then before closing it, another job tries to open it as well.
You need to make your reader job scoped (step scoped would also work).
Upvotes: 1