Reputation: 1108
I'm trying to run multiple steps in parallel but I get errors (collision) in the backend SQL Server database when the different flows are trying to update the JOB_EXECUTION stats. I am using the async task executor, but beyond that, I am not sure how to get these flows to run successfully.
<batch:split id="createContextMaps" task-executor="asyncTaskExecutor" next="processBWReport">
<batch:flow>
<batch:step id="createTabcPermitsMap">
<batch:tasklet>
<batch:chunk reader="tabcPermitReader" writer="tabcPermitContextWriter" commit-interval="5000" />
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="createCustPermitsMap">
<batch:tasklet>
<batch:chunk reader="custPermitReader" writer="custPermitContextWriter" commit-interval="5000" />
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
I get Caused by: java.util.ConcurrentModificationException
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) [spring-batch-core-4.3.5.jar:4.3.5]
at
org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149) [spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.3.18.jar:5.3.18]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.3.5.jar:4.3.5]
at com.xxx.batch.domain.comptroller.Launcher$1.run(Launcher.java:116) [classes/:?]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_171]
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=createContextMaps.0 at state=createContextMaps.0.createTabcPermitsMap with exception
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:178) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
... 1 more
Caused by: java.lang.IllegalArgumentException: Could not serialize the execution context
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:306) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
... 1 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: (was java.util.ConcurrentModificationException) (through reference chain: java.util.HashMap["CUST_PERMITS"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:943) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
... 1 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(Unknown Source) ~[?:1.8.0_171]
at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:904) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:941) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
... 1 more
Upvotes: 0
Views: 618
Reputation: 31730
There seems to be an error while serializing the excution context java.lang.IllegalArgumentException: Could not serialize the execution context
due to a java.util.ConcurrentModificationException
.
Since your steps are running in parallel in different concurrent threads, you need to make sure all data structures you store in the execution context are thread-safe and serializable. For example, you should use a ConcurrentHashMap
instead of a HashMap
. Moreover, since the execution context is serialized and persisted in the job repository, all values should be Serializable
.
Upvotes: 1