David Geary
David Geary

Reputation: 2284

Multi-threaded acces to Job Scope beans in Spring Batch 3.0

In Spring Batch 3.0 I'm trying to use the new Job Scope functionality for beans in both partitioned and multi-threaded steps (configured with an task:executor bean), and in both cases I'm getting the exception

  Caused by: java.lang.IllegalStateException: No context holder available for job scope
        at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:153)
        at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:338)

but if i make the beans step scope it works OK.

I noticed the comment on JobSynchronizationManager which says

N.B. it is the responsibility of every {@link Job} implementation to ensure that a {@link JobContext} is available on every thread that might be involved in a job execution, including worker threads from a pool.

so I'm wondering if I need to do something to set this up or if its a bug in the the job scope implementation that it doesn't set up the worker threads correctly?

StepSynchronizationManager has a similar comment - but in that case something is obviously setting up the threads correctly within the step.

Sample code to reproduce issue:

TestItemReader

package test;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;

public class TestItemReader implements ItemReader<Integer>, InitializingBean {

    private List<Integer> items;

    @Override
    public synchronized Integer read() throws Exception, UnexpectedInputException,
            ParseException, NonTransientResourceException {

        if (items.size() > 0) {
            return items.remove(0);
        }

        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        System.out.println("Initialising reader");

        items = new ArrayList<Integer>();   
        for (int i=0;i<100;i++) items.add(i);       
    }
}

TestItemWriter

package test;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class TestItemWriter implements ItemWriter<Integer> {

    @Override
    public void write(List<? extends Integer> items) throws Exception {

        for (int i : items) {
            System.out.println(Thread.currentThread().getName() + " Writing " + i);
        }       
    }
}

test-job-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                         http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <job id="job" restartable="true" xmlns="http://www.springframework.org/schema/batch">
        <step id="index">   
            <tasklet task-executor="executor">
                <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
            </tasklet>
        </step>
    </job>

    <bean id="itemReader" class="test.TestItemReader" scope="job"/>

    <bean id="itemWriter" class="test.TestItemWriter"/>

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean class="org.springframework.batch.test.JobLauncherTestUtils">
        <property name="job" ref="job"/>
    </bean>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>

    <batch:job-repository id="jobRepository"/>

    <jdbc:embedded-database id="dataSource" type="HSQL">
        <jdbc:script location="classpath:/org/springframework/batch/core/schema-hsqldb.sql"/>
    </jdbc:embedded-database>

    <task:executor id="executor" queue-capacity="0" pool-size="5"/>

</beans>

JobTest

package test;

import java.util.Collection;

import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.ContextConfiguration;

@ContextConfiguration(locations={"test-job-context.xml"})
public class JobTest extends AbstractJUnit4SpringContextTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @BeforeClass
    public static void beforeClassSetup() {    

        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.WARN);
        Logger.getLogger("org.springframework.batch.core.scope.JobScope").setLevel(Level.DEBUG);
        Logger.getLogger("org.springframework.batch.core.scope.StepScope").setLevel(Level.DEBUG);
    }

    @Test
    public void testJobLaunch() throws Exception {

        JobExecution execution = jobLauncherTestUtils.launchJob();

        System.out.println("After execution "  + execution);

        Collection<StepExecution> stepExecutions = execution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) {
            System.out.println("StepExecution " + stepExecution);
        }
    }   
}

Running the above JUnit test will reproduce the issue. If you change the scope on the reader to step, or remove the scope the test completes normally.

Upvotes: 9

Views: 9540

Answers (1)

Michael Minella
Michael Minella

Reputation: 21483

This is due to the fact that the current execution (JobExecution in this case) is stored in a ThreadLocal (see org.springframework.batch.core.scope.context.SynchronizationManagerSupport). That being said, adding multithreaded support for this doesn't seem unreasonable. Feel free to create a Jira issue for it (and a Pull Request if you so incline).

Upvotes: 3

Related Questions