Davidson
Davidson

Reputation: 1108

Not able to run Spring Batch Steps in parallel

I'm trying to run multiple steps in parallel but I get errors (collision) in the backend SQL Server database when the different flows are trying to update the JOB_EXECUTION stats. I am using the async task executor, but beyond that, I am not sure how to get these flows to run successfully.

    <batch:split id="createContextMaps" task-executor="asyncTaskExecutor" next="processBWReport">
        <batch:flow>
            <batch:step id="createTabcPermitsMap">
                <batch:tasklet>
                    <batch:chunk reader="tabcPermitReader" writer="tabcPermitContextWriter" commit-interval="5000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
        <batch:flow>
            <batch:step id="createCustPermitsMap">
                <batch:tasklet>
                    <batch:chunk reader="custPermitReader" writer="custPermitContextWriter" commit-interval="5000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
    </batch:split>

<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>

I get Caused by: java.util.ConcurrentModificationException

    org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
        at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) [spring-batch-core-4.3.5.jar:4.3.5]
        at 
org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149) [spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.3.18.jar:5.3.18]
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.3.5.jar:4.3.5]
        at com.xxx.batch.domain.comptroller.Launcher$1.run(Launcher.java:116) [classes/:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_171]
    Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=createContextMaps.0 at state=createContextMaps.0.createTabcPermitsMap with exception
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:178) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: java.lang.IllegalArgumentException: Could not serialize the execution context
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:306) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: (was java.util.ConcurrentModificationException) (through reference chain: java.util.HashMap["CUST_PERMITS"])
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:943) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextNode(Unknown Source) ~[?:1.8.0_171]
        at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
        at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:904) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:941) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more

Upvotes: 0

Views: 618

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31730

There seems to be an error while serializing the excution context java.lang.IllegalArgumentException: Could not serialize the execution context due to a java.util.ConcurrentModificationException.

Since your steps are running in parallel in different concurrent threads, you need to make sure all data structures you store in the execution context are thread-safe and serializable. For example, you should use a ConcurrentHashMap instead of a HashMap. Moreover, since the execution context is serialized and persisted in the job repository, all values should be Serializable.

Upvotes: 1

Related Questions